2 分钟阅读

前言

Spark Streaming 是将实时数据流按时间分段后,当作小的批处理数据去计算。那么 Flink 则相反,一开始就是按照流处理计算去设计的。当把从文件系统(HDFS)中读入的数据也当做数据流看待,他就变成批处理系统了。

《大数据处理实战》spark RDD 是否就是完美无缺的呢?显然不是,它还是很底层,不方便开发者使用,而且用 RDD API 写的应用程序需要大量的人工调优来提高性能。Spark SQL 提供的 DataFrame/DataSet API 就解决了这个问题,它提供类似 SQL 的查询接口,把数据看成关系型数据库的表,提升了熟悉关系型数据库的开发者的工作效率。Spark 是怎样支持流处理的呢?Spark Streaming 和新的 Structured Streaming,其中 Structured Streaming 也可以使用 DataSet/DataFrame API,这就实现了 Spark 批流处理的统一。Spark 有什么缺点?流处理的实时性还不够,所以无法用在一些对实时性要求很高的流处理场景中。这是因为 Spark 的流处理是基于所谓微批处理(Micro-batch processing)的思想,即它把流处理看作是批处理的一种特殊形式,每次接收到一个时间间隔的数据才会去处理,所以天生很难在实时性上有所提升。想要在流处理的实时性上提升,就不能继续用微批处理的模式,而要想办法实现真正的流处理,即每当有一条数据输入就立刻处理,不做等待。

流式处理系统的演变

聊到现代流式数据处理, 谷歌大神 Tyler Akidau 在2015年8月关于流式计算写了两篇博客(Streaming 101: 批处理之上的世界(一), Streaming 102: 批处理之上的世界(二)(上), Streaming 102: 批处理之上的世界(二)(下), 【总结】批处理之上的世界(二)),后来写了《Streaming System》一书。它探索了四个问题:

  1. 【What】流式数据处理系统计算出什么结果?结果由pipeline的转换器决定。转换器好比MapReduce中的Mapper、Reducer函数,Spark中的transform算子。
  2. 【where】流式数据的结果在哪里计算?流式数据由事件构成,根据事件时间,流式数据可以切分成一个个窗口,把无界数据变成有界数据,计算在窗口中完成。
  3. 【when】计算结果何时输出?水位线或触发器触发输出。水位线表示属于某个窗口时间范围的事件全部到达,如果需要在水位线之前输出结果,或者水位线之后,还有迟到的事件需要计算,需要触发器的帮助。
  4. 【How】如果修正计算结果?一个窗口的结果会被计算多次。每次计算结果可以独立地发送到下游,也可以更新之前计算的结果,还可以把之前的结果丢弃,再发送新的结果。

我们需要一个流式系统,能够达成三点目标:

  1. 数据的正确性,“正好一次”的数据处理机制,要做到这一点,我们需要在流式数据系统里,内置一个数据去重的机制。
  2. 系统的容错能力,把计算节点需要使用的“状态”信息持久化下来。这样,我们才能够做到真正的容错,而不是在系统出错的时候丢失一部分信息。而且,这个机制也有助于我们在线扩容。
  3. 对于时间窗口的正确处理,也就是能够准确地根据事件时间生成报表,而不是简单地使用进行处理的服务器的本地时间。并且,还需要能够考虑到分布式集群中,数据的传输可能会有延时的情况出现。把流式数据处理的时间窗口,以及触发机制内置到流式处理系统里面去。这样,我们就可以让我们的业务代码,专注于实现业务逻辑,而不是需要自己在应用代码里,搞一套时间窗口的维护和触发机制。

我们希望能有一个流式处理系统,帮助我们解决这些问题。我们并不希望,自己在写 Storm 的 Spout 代码的时候,写上一大堆代码,来解决正好一次的数据处理、Spout 中间状态的持久化,以及针对时间窗口的处理逻辑。因为这些问题,是流式数据处理的共性问题。

dataflow模型

2015 年,Google发表了一篇《The Dataflow Model》的论文。 Dataflow(三):一个统一的编程模型

Dataflow 的核心计算模型非常简单,它只有两个概念

  1. 一个叫做 ParDo,顾名思义,也就是并行处理的意思。地位相当于 MapReduce 里的 Map 阶段。所有的输入数据,都会被一个 DoFn,也就是处理函数处理。但是这些数据,不是在一台服务器上处理的,而是和 MapReduce 一样,会在很多台机器上被并行处理。只不过 MapReduce 里的数据处理,只有一个 Map 阶段和一个 Reduce 阶段。而在 Dataflow 里,Pardo 会和下面的 GroupByKey 组合起来,可以有很多层,就好像是很多个 MapReduce 串在一起一样。
  2. 另一个叫做 GroupByKey,也就是按照 Key 进行分组数据处理的问题。地位则是 MapReduce 里的 Shuffle 操作。在 Dataflow 里,所有的数据都被抽象成了 key-value 对。前面的 ParDo 的输入和 Map 函数一样,是一个 key-value 对,输出也是一系列的 key-value 对。而 GroupByKey,则是把相同的 Key 汇总到一起,然后再通过一个 ParDo 下的 DoFn 进行处理。

比如,我们有一个不断输入的日志流,想要统计所有广告展示次数超过 100 万次的广告。那么,我们可以先通过一个 Pardo 解析日志,然后输出(广告 ID,1)这样的 key-value 对,通过 GroupByKey,把相同的广告 ID 的数据分组到一起。然后再通过一个 ParDo,并行统计每一个广告 ID 下的展示次数。最后再通过一个 ParDo,过滤掉所有展示次数少于 100 万次的广告就好了。

流批一体:在 MapReduce 的计算模型下,会有哪些输入数据,是在 MapReduce 的任务开始之前就确定的。这意味着数据从 Map 端被 Shuffle 到 Reduce 端,只依赖于我们的 CPU、网络这些硬件处理能力。而在 Dataflow 里,输入的数据集是无边界的,随着时间的推移,不断会有新的输入数据加入进来。如果从这个角度来思考,那么我们之前把大数据处理分成批处理和流式处理,其实并没有找到两种数据处理的核心差异。因为,对于一份预先确定、边界明确的数据,我们一样可以使用流式处理。比如,我们可以把一份固定大小日志,放到 Kakfa 里,重放一遍给一个 Storm 的 Topology 来处理,那也是流式处理,但这是处理的有边界的数据。而对于不断增长的实时数据,我们一样可以不断定时执行 MapReduce 这样的批处理任务,或者通过 Spark Streaming 这样看起来是流式处理,其实是微批(Mini-Batch)的处理方式。

在 Dataflow 的论文里,Google 把整个大数据的流式处理,抽象成了三个概念。

  1. 是对于乱序数据,能够按照事件发生时间计算时间窗口的模型。在 Dataflow 模型里,需要的不只是 GroupByKey,实际在统计数据的时候,往往需要的是 GroupByKeyAndWindow。统计一个不考虑任何时间窗口的数据,往往是没有意义的。每一个原始的事件,在我们的业务处理函数之前,其实都是(key, value, event_time)这样一个三元组。而 AssignWindows 要做的,就是把这个三元组,根据我们的处理逻辑,变成(key, value, event_time, window)这样的四元组。需要注意,一个事件不只可以分配给一个时间窗口,而是可以分配给多个时间窗口。比如,我们有一个广告在 12:01 展示给了用户,但是我们统计的是“过去 2 分钟的广告展示”,那么这个事件,就会被分配给[12:00, 12:02) 和[12:01, 12:03) 两个时间窗口,我们原先一条的事件就可以变成多条记录。有了 Window 的信息之后,如果我们想要按照固定窗口或者滑动窗口统计数据,我们可以很容易地根据 Key+Window 进行聚合,完成相应的计算。窗口的分配和合并功能,就使得 Dataflow 可以处理乱序数据。相同的数据以不同的顺序到达我们的计算节点,计算的结果仍然是相同的。
  2. 根据数据处理的多维度特征,来决定计算结果什么时候输出的触发器模型。我们会遇到延时、容错等情况,所以我们还需要有一个机制告诉我们,在什么时候数据都已经到了,我们可以把计算结果向下游输出了。在 Dataflow 里,除了内置的基于水位信息的完成度触发器,它还能够支持基于处理时间、记录数等多个参数组合触发。而且用户可以实现自定义触发器,完全根据自己的需要来实现触发器逻辑。
  3. 能够把数据的更新和撤回,与前面的窗口模型和触发器模型集成的增量处理策略。

使用

public class KafkaExample {
    public static void main(String[] args) throws Exception {
        // parse input arguments
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); 
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds 
        env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<KafkaEvent> input = env .addSource(
            new FlinkKafkaConsumer<>(
                parameterTool.getRequired("input-topic"),
                new KafkaEventSchema(),
                parameterTool.getProperties()) 
                .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
        .keyBy("word")
        .map(new RollingAdditionMapper())
        .shuffle();

        input.addSink(
            new FlinkKafkaProducer<>(
            parameterTool.getRequired("output-topic"),
            new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()), 
            parameterTool.getProperties(), 
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
        env.execute("Modern Kafka Example"); 
    }    
}

理论基础来自谷歌论文 《The dataflow model:A pracitical approach to balancing correctness,latency, and cost in massive-scale,unbounded,out-of-order data processing》

  1. 数据从上一个Operation节点直接Push到下一个Operation节点。
  2. 各节点可以分布在不同的Task线程中运行,数据在Operation之间传递。
  3. 具有Shuffle过程,数据从上游Operation push 到下游Operation,不像MapReduce模型,Reduce从Map端拉取数据。
  4. 实现框架有ApacheStorm和ApacheFlink以及ApacheBeam。

DataStream API

  1. source
  2. sink
  3. 转换操作
    1. 基于单条记录filter/map
    2. 基于窗口 window
    3. 多流合并 union join connect
    4. 单流切分 split
  4. DataStream 之间的转换

设计

Flink 中最核心的数据结构是 Stream,它代表一个运行在多个分区上的并行流。在 Stream 上同样可以进行各种转换操作(Transformation)。与 Spark 的 RDD 不同的是,Stream 代表一个数据流而不是静态数据的集合。所以,它包含的数据是随着时间增长而变化的。而且 Stream 上的转换操作都是逐条进行的,即每当有新的数据进来,整个流程都会被执行并更新结果。

flink采用了基于操作符(Operator)的连续流模型,当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow,Streaming Dataflow 包括 Stream 和 Operator(操作符)。转换操作符把一个或多个 Stream 转换成多个 Stream。每个 Dataflow 都有一个输入数据源(Source)和输出数据源(Sink)。与 Spark 的 RDD 转换图类似,Streaming Dataflow 也会被组合成一个有向无环图去执行。在 Flink 中,程序天生是并行和分布式的。一个 Stream 可以包含多个分区(Stream Partitions),一个操作符可以被分成多个操作符子任务,每一个子任务是在不同的线程或者不同的机器节点中独立执行的。

实现

浓浓的tf 提交dag的味道,区别是flink 和spark 一样是集群先跑起来,再提交任务。中间设计到graph 拆分为task 、调度task 到具体TaskManager的过程。

  1. JobManager 管理节点,每个集群至少一个,管理整个集群计算资源,Job管理与调度执行,以及 Checkpoint 协调。
  2. TaskManager :每个集群有多个TM ,负责计算资源提供。
  3. Client :本地执行应用 main() 方法解析JobGraph 对象,并最终将JobGraph 提交到 JobManager 运行,同时监控Job执行的状态。

dispatcher 任务调度

  1. 一个jobGraph 对应一个 jobManager(JobMaster) 双层资源调度
  2. cluster -> job, slotManager 会给 jobGraph 对应的jobManager 分配多个slot (slotPool)
  3. job -> task, 单个slot 可以用于一个或多个task 执行; 但相同的task 不能在一个slot 中运行

Flink 四种 Graph 转换

  1. 第一层: Program -> StreamGraph。算子之间的拓扑关系。
  2. 第二层: StreamGraph -> JobGraph。 不涉及数据跨节点交换 的Operation 会组成 OperatorChain(最终在一个task 里运行)
  3. 第三层: JobGraph -> ExecutionGraph
  4. 第四层: Execution -> 物理执行计划

Flink 是一个面向有限流和无限流有状态计算的分布式计算框架,它能够支持流处理和批处理两种应用类型。在传统意义上,Flink 是一个无限的数据流。但如果我们用一个个的时间窗口把无限的数据流进行切分,我们就得到很多有限数据流,对 Flink 来说,批式数据只是流式数据的一种特例。无论是无限数据流还是有限处理流,Flink 都可以通过同一种 API、同一套代码进行处理之后,服务下游的数据。这样的流程也可以极大地减少工程师的学习和维护成本。可以说,Flink 无论是从上层的代码层面、SDK 层面、API 层面,还是下层的调度器层面,都是针对流批一体的整体架构来进行设计的,是可以从上至下完整地支持流批一体的数据处理引擎。

Flink 提供的两个核心 API 就是 DataSet API 和 DataStream API。你没看错,名字和 Spark 的 DataSet、DataFrame 非常相似。顾名思义,DataSet 代表有界的数据集,而 DataStream 代表流数据。Flink 这样基于流的模型是怎样支持批处理的?在内部,DataSet 其实也用 Stream 表示,静态的有界数据也可以被看作是特殊的流数据,而且 DataSet 与 DataStream 可以无缝切换。所以,Flink 的核心是 DataStream。

如果要进行流计算,Flink 会初始化一个流执行环境 StreamExecutionEnvironment,然后利用这个执行环境构建数据流 DataStream。

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

如果要进行批处理计算,Flink 会初始化一个批处理执行环境 ExecutionEnvironment,然后利用这个环境构建数据集 DataSet。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet text = env.readTextFile("/path/to/file");

然后在 DataStream 或者 DataSet 上执行各种数据转换操作(transformation),这点很像 Spark。不管是流处理还是批处理,Flink 运行时的执行引擎是相同的,只是数据源不同而已。

演进——流批一体

上图是一个非常典型,业界应用也很多的数据链路图。这个数据链路是典型的 Lamda 架构,整个数据链路分为批式计算链路和流式计算链路。在字节跳动内部,通常需要批式计算和流式计算两条链路共同服务于下游的应用。

  1. 在批式计算链路中,我们主要应用 Spark 引擎,通过 Spark 引擎在批式存储中拿到数据,经过 ETL 的计算后,存入下游的存储,从而服务下游的应用。
  2. 流式计算链路,也是我们整个实时推荐、实时信息流的核心链路。我们会通过消息中心件把实时数据进行缓存存入,然后运用 Flink 实时计算引擎进行处理,处理后经过消息中间件的缓存传输存入下游的存储,来服务下层的应用。

整个计算架构分成两条链路,带来了两个比较严重的问题:

  1. 计算不同源
    1. 维护成本高。批式计算主要使用 Spark 引擎,流式计算使用 Flink 引擎。维护两套引擎就意味着使用两套代码,工程师的维护成本和学习成本都非常高。
    2. 数据一致性和质量难以保障。两套代码之间不能相互复用,所以数据的一致性和数据的质量难以保障。
    3. 无法混合调度造成资源浪费。批式计算和流式计算的高峰期是不同的。对流式计算来说,用户的使用高峰期一般是白天或凌晨12点之前,那么这些时间段也就是流式计算的高峰,此时对计算资源的需求是非常高的。相对而言,批式计算对运算时间并没有严格的限制,比如可以在凌晨12点之后到早上6、7点之间进行大量运算。所以,如果流式计算和批式计算的资源无法进行混合调度,那么就无法对运算资源进行错峰使用,造成资源的巨大浪费。
  2. 存储不同源
    1. 数据不一致,维护成本高。如果两条链路同时服务于下游应用的话,那么两套存储系统也是分隔开的,依然存在数据不一致的问题。同时,维护流式、批式两套存储系统的成本也非常高。

那么,什么是流批一体呢?

  1. 从计算层面来讲,就是用同一个引擎、同一套代码及同样的 API ,同时处理有限的数据流和无限的数据流,同时应对在线处理和离线处理(其中有限数据的处理对应离线处理,而无限数据的处理则对应在线处理),达到降本增效的目的。
  2. 在存储方面,流批一体即存储系统能够同时满足流式数据和批式数据的存储,并能够有效地进行协同以及元数据信息的更新。

无论是流式数据还是批式数据,都可以直接或经过简单加工后存入统一存储中。而后,使用流批一体统一的计算引擎进行 ETL 计算,再服务下游的应用。由此,整个流批一体的架构实质上实现了计算同源和存储同源。

  1. 计算同源。用一套代码、一套逻辑去处理流式任务和批式任务,达到了降本增效的目的,同时也大幅提升了资源利用率。
  2. 存储同源。在存储方面统一存储,避免了存储资源的浪费,同时也在很大的程度上避免了数据不一致。

在字节跳动,我们使用 Flink 作为流批一体统一的计算引擎,Iceberg 作为流批一体统一的存储方式。

深入浅出流批一体理论篇——数据架构的演进 未读。

其它

流式处理,支持一条一条处理, 对于扩展operation,spark/flink client 会把main代码 和 jar 打包成一个package 上传到Master,jobGraph上附带job 相关的自定义jar信息, taskManager 执行task 前下载 相应的jar (然后由classloader)加载执行,而tf 因为其特殊性 就只能自定义op了。

Flink 新一代流计算的阶段总结 未读。

蚂蚁实时计算团队的AntFlink提交攻坚之路 如何减少提交任务的耗时? Blink提交采用进程模型(包装flink info/run命令)进行作业执行计划的生成和作业的提交,这个基本是大数据计算引擎jstorm/spark/flink的共识,采用该方式的优点在于:

  1. 简单:
    1. 用户只需在自己的jar包中进行逻辑处理
    2. 引擎client负责以方法调用形式调用用户main方法,然后编译、提交
  2. 干净
    1. 进程模型用户包用完销毁,引擎版本包通过目录隔离,不用考虑多版本问题。

但这也带来了缺点,每次都得走一遍大量class 加载、校验等jvm启动全流程。同时,大多数作业的的执行计划生成耗时是在20秒以内,也就是说此时瓶颈不在编译阶段,此时jvm启动开销就成为了瓶颈。尤其当这些操作极其高频时,带来的开销不容小视。

留下评论