技术

《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go 内存管理 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 容器日志采集 Kubernetes 控制器模型 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 jib源码分析之细节 线程排队 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 回溯法 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup Linux网络源代码学习——整体介绍 zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

特征平台 实时训练 分布式链路追踪 helm tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 机器学习中的python调用c 机器学习训练框架概述 embedding的原理及实践 tensornet源码分析 大模型训练 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 在离线业务混部 RNN pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 tensorflow学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台 tf-operator源码分析 k8s批处理调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF 生命周期管理 openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Kubernetes webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 controller 组件介绍 openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 serverless 泛谈 概率论 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念入门 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 用户登陆 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 当我在说模板引擎的时候,我在说什么 用户认证问题 资源的分配与回收——池 消息/任务队列

标签


Flink学习

2022年03月20日

前言

Spark Streaming 是将实时数据流按时间分段后,当作小的批处理数据去计算。那么 Flink 则相反,一开始就是按照流处理计算去设计的。当把从文件系统(HDFS)中读入的数据也当做数据流看待,他就变成批处理系统了。

《大数据处理实战》spark RDD 是否就是完美无缺的呢?显然不是,它还是很底层,不方便开发者使用,而且用 RDD API 写的应用程序需要大量的人工调优来提高性能。Spark SQL 提供的 DataFrame/DataSet API 就解决了这个问题,它提供类似 SQL 的查询接口,把数据看成关系型数据库的表,提升了熟悉关系型数据库的开发者的工作效率。Spark 是怎样支持流处理的呢?Spark Streaming 和新的 Structured Streaming,其中 Structured Streaming 也可以使用 DataSet/DataFrame API,这就实现了 Spark 批流处理的统一。Spark 有什么缺点?流处理的实时性还不够,所以无法用在一些对实时性要求很高的流处理场景中。这是因为 Spark 的流处理是基于所谓微批处理(Micro-batch processing)的思想,即它把流处理看作是批处理的一种特殊形式,每次接收到一个时间间隔的数据才会去处理,所以天生很难在实时性上有所提升。想要在流处理的实时性上提升,就不能继续用微批处理的模式,而要想办法实现真正的流处理,即每当有一条数据输入就立刻处理,不做等待。

流式处理系统的演变

聊到现代流式数据处理, 谷歌大神 Tyler Akidau 在2015年8月关于流式计算写了两篇博客(Streaming 101: 批处理之上的世界(一), Streaming 102: 批处理之上的世界(二)(上), Streaming 102: 批处理之上的世界(二)(下), 【总结】批处理之上的世界(二)),后来写了《Streaming System》一书。它探索了四个问题:

  1. 【What】流式数据处理系统计算出什么结果?结果由pipeline的转换器决定。转换器好比MapReduce中的Mapper、Reducer函数,Spark中的transform算子。
  2. 【where】流式数据的结果在哪里计算?流式数据由事件构成,根据事件时间,流式数据可以切分成一个个窗口,把无界数据变成有界数据,计算在窗口中完成。
  3. 【when】计算结果何时输出?水位线或触发器触发输出。水位线表示属于某个窗口时间范围的事件全部到达,如果需要在水位线之前输出结果,或者水位线之后,还有迟到的事件需要计算,需要触发器的帮助。
  4. 【How】如果修正计算结果?一个窗口的结果会被计算多次。每次计算结果可以独立地发送到下游,也可以更新之前计算的结果,还可以把之前的结果丢弃,再发送新的结果。

我们需要一个流式系统,能够达成三点目标:

  1. 数据的正确性,“正好一次”的数据处理机制,要做到这一点,我们需要在流式数据系统里,内置一个数据去重的机制。
  2. 系统的容错能力,把计算节点需要使用的“状态”信息持久化下来。这样,我们才能够做到真正的容错,而不是在系统出错的时候丢失一部分信息。而且,这个机制也有助于我们在线扩容。
  3. 对于时间窗口的正确处理,也就是能够准确地根据事件时间生成报表,而不是简单地使用进行处理的服务器的本地时间。并且,还需要能够考虑到分布式集群中,数据的传输可能会有延时的情况出现。把流式数据处理的时间窗口,以及触发机制内置到流式处理系统里面去。这样,我们就可以让我们的业务代码,专注于实现业务逻辑,而不是需要自己在应用代码里,搞一套时间窗口的维护和触发机制。

我们希望能有一个流式处理系统,帮助我们解决这些问题。我们并不希望,自己在写 Storm 的 Spout 代码的时候,写上一大堆代码,来解决正好一次的数据处理、Spout 中间状态的持久化,以及针对时间窗口的处理逻辑。因为这些问题,是流式数据处理的共性问题。

dataflow模型

2015 年,Google发表了一篇《The Dataflow Model》的论文。 Dataflow(三):一个统一的编程模型

Dataflow 的核心计算模型非常简单,它只有两个概念

  1. 一个叫做 ParDo,顾名思义,也就是并行处理的意思。地位相当于 MapReduce 里的 Map 阶段。所有的输入数据,都会被一个 DoFn,也就是处理函数处理。但是这些数据,不是在一台服务器上处理的,而是和 MapReduce 一样,会在很多台机器上被并行处理。只不过 MapReduce 里的数据处理,只有一个 Map 阶段和一个 Reduce 阶段。而在 Dataflow 里,Pardo 会和下面的 GroupByKey 组合起来,可以有很多层,就好像是很多个 MapReduce 串在一起一样。
  2. 另一个叫做 GroupByKey,也就是按照 Key 进行分组数据处理的问题。地位则是 MapReduce 里的 Shuffle 操作。在 Dataflow 里,所有的数据都被抽象成了 key-value 对。前面的 ParDo 的输入和 Map 函数一样,是一个 key-value 对,输出也是一系列的 key-value 对。而 GroupByKey,则是把相同的 Key 汇总到一起,然后再通过一个 ParDo 下的 DoFn 进行处理。

比如,我们有一个不断输入的日志流,想要统计所有广告展示次数超过 100 万次的广告。那么,我们可以先通过一个 Pardo 解析日志,然后输出(广告 ID,1)这样的 key-value 对,通过 GroupByKey,把相同的广告 ID 的数据分组到一起。然后再通过一个 ParDo,并行统计每一个广告 ID 下的展示次数。最后再通过一个 ParDo,过滤掉所有展示次数少于 100 万次的广告就好了。

流批一体:在 MapReduce 的计算模型下,会有哪些输入数据,是在 MapReduce 的任务开始之前就确定的。这意味着数据从 Map 端被 Shuffle 到 Reduce 端,只依赖于我们的 CPU、网络这些硬件处理能力。而在 Dataflow 里,输入的数据集是无边界的,随着时间的推移,不断会有新的输入数据加入进来。如果从这个角度来思考,那么我们之前把大数据处理分成批处理和流式处理,其实并没有找到两种数据处理的核心差异。因为,对于一份预先确定、边界明确的数据,我们一样可以使用流式处理。比如,我们可以把一份固定大小日志,放到 Kakfa 里,重放一遍给一个 Storm 的 Topology 来处理,那也是流式处理,但这是处理的有边界的数据。而对于不断增长的实时数据,我们一样可以不断定时执行 MapReduce 这样的批处理任务,或者通过 Spark Streaming 这样看起来是流式处理,其实是微批(Mini-Batch)的处理方式。

在 Dataflow 的论文里,Google 把整个大数据的流式处理,抽象成了三个概念。

  1. 是对于乱序数据,能够按照事件发生时间计算时间窗口的模型。在 Dataflow 模型里,需要的不只是 GroupByKey,实际在统计数据的时候,往往需要的是 GroupByKeyAndWindow。统计一个不考虑任何时间窗口的数据,往往是没有意义的。每一个原始的事件,在我们的业务处理函数之前,其实都是(key, value, event_time)这样一个三元组。而 AssignWindows 要做的,就是把这个三元组,根据我们的处理逻辑,变成(key, value, event_time, window)这样的四元组。需要注意,一个事件不只可以分配给一个时间窗口,而是可以分配给多个时间窗口。比如,我们有一个广告在 12:01 展示给了用户,但是我们统计的是“过去 2 分钟的广告展示”,那么这个事件,就会被分配给[12:00, 12:02) 和[12:01, 12:03) 两个时间窗口,我们原先一条的事件就可以变成多条记录。有了 Window 的信息之后,如果我们想要按照固定窗口或者滑动窗口统计数据,我们可以很容易地根据 Key+Window 进行聚合,完成相应的计算。窗口的分配和合并功能,就使得 Dataflow 可以处理乱序数据。相同的数据以不同的顺序到达我们的计算节点,计算的结果仍然是相同的。
  2. 根据数据处理的多维度特征,来决定计算结果什么时候输出的触发器模型。我们会遇到延时、容错等情况,所以我们还需要有一个机制告诉我们,在什么时候数据都已经到了,我们可以把计算结果向下游输出了。在 Dataflow 里,除了内置的基于水位信息的完成度触发器,它还能够支持基于处理时间、记录数等多个参数组合触发。而且用户可以实现自定义触发器,完全根据自己的需要来实现触发器逻辑。
  3. 能够把数据的更新和撤回,与前面的窗口模型和触发器模型集成的增量处理策略。

使用

public class KafkaExample {
    public static void main(String[] args) throws Exception {
        // parse input arguments
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); 
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds 
        env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<KafkaEvent> input = env .addSource(
            new FlinkKafkaConsumer<>(
                parameterTool.getRequired("input-topic"),
                new KafkaEventSchema(),
                parameterTool.getProperties()) 
                .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
        .keyBy("word")
        .map(new RollingAdditionMapper())
        .shuffle();

        input.addSink(
            new FlinkKafkaProducer<>(
            parameterTool.getRequired("output-topic"),
            new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()), 
            parameterTool.getProperties(), 
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
        env.execute("Modern Kafka Example"); 
    }    
}

理论基础来自谷歌论文 《The dataflow model:A pracitical approach to balancing correctness,latency, and cost in massive-scale,unbounded,out-of-order data processing》

  1. 数据从上一个Operation节点直接Push到下一个Operation节点。
  2. 各节点可以分布在不同的Task线程中运行,数据在Operation之间传递。
  3. 具有Shuffle过程,数据从上游Operation push 到下游Operation,不像MapReduce模型,Reduce从Map端拉取数据。
  4. 实现框架有ApacheStorm和ApacheFlink以及ApacheBeam。

DataStream API

  1. source
  2. sink
  3. 转换操作
    1. 基于单条记录filter/map
    2. 基于窗口 window
    3. 多流合并 union join connect
    4. 单流切分 split
  4. DataStream 之间的转换

设计

Flink 中最核心的数据结构是 Stream,它代表一个运行在多个分区上的并行流。在 Stream 上同样可以进行各种转换操作(Transformation)。与 Spark 的 RDD 不同的是,Stream 代表一个数据流而不是静态数据的集合。所以,它包含的数据是随着时间增长而变化的。而且 Stream 上的转换操作都是逐条进行的,即每当有新的数据进来,整个流程都会被执行并更新结果。

flink采用了基于操作符(Operator)的连续流模型,当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow,Streaming Dataflow 包括 Stream 和 Operator(操作符)。转换操作符把一个或多个 Stream 转换成多个 Stream。每个 Dataflow 都有一个输入数据源(Source)和输出数据源(Sink)。与 Spark 的 RDD 转换图类似,Streaming Dataflow 也会被组合成一个有向无环图去执行。在 Flink 中,程序天生是并行和分布式的。一个 Stream 可以包含多个分区(Stream Partitions),一个操作符可以被分成多个操作符子任务,每一个子任务是在不同的线程或者不同的机器节点中独立执行的。

实现

浓浓的tf 提交dag的味道,区别是flink 和spark 一样是集群先跑起来,再提交任务。中间设计到graph 拆分为task 、调度task 到具体TaskManager的过程。

  1. JobManager 管理节点,每个集群至少一个,管理整个集群计算资源,Job管理与调度执行,以及 Checkpoint 协调。
  2. TaskManager :每个集群有多个TM ,负责计算资源提供。
  3. Client :本地执行应用 main() 方法解析JobGraph 对象,并最终将JobGraph 提交到 JobManager 运行,同时监控Job执行的状态。

dispatcher 任务调度

  1. 一个jobGraph 对应一个 jobManager(JobMaster) 双层资源调度
  2. cluster -> job, slotManager 会给 jobGraph 对应的jobManager 分配多个slot (slotPool)
  3. job -> task, 单个slot 可以用于一个或多个task 执行; 但相同的task 不能在一个slot 中运行

Flink 四种 Graph 转换

  1. 第一层: Program -> StreamGraph。算子之间的拓扑关系。
  2. 第二层: StreamGraph -> JobGraph。 不涉及数据跨节点交换 的Operation 会组成 OperatorChain(最终在一个task 里运行)
  3. 第三层: JobGraph -> ExecutionGraph
  4. 第四层: Execution -> 物理执行计划

Flink 提供的两个核心 API 就是 DataSet API 和 DataStream API。你没看错,名字和 Spark 的 DataSet、DataFrame 非常相似。顾名思义,DataSet 代表有界的数据集,而 DataStream 代表流数据。Flink 这样基于流的模型是怎样支持批处理的?在内部,DataSet 其实也用 Stream 表示,静态的有界数据也可以被看作是特殊的流数据,而且 DataSet 与 DataStream 可以无缝切换。所以,Flink 的核心是 DataStream。

如果要进行流计算,Flink 会初始化一个流执行环境 StreamExecutionEnvironment,然后利用这个执行环境构建数据流 DataStream。

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

如果要进行批处理计算,Flink 会初始化一个批处理执行环境 ExecutionEnvironment,然后利用这个环境构建数据集 DataSet。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet text = env.readTextFile("/path/to/file");

然后在 DataStream 或者 DataSet 上执行各种数据转换操作(transformation),这点很像 Spark。不管是流处理还是批处理,Flink 运行时的执行引擎是相同的,只是数据源不同而已。

其它

流式处理,支持一条一条处理, 对于扩展operation,spark/flink client 会把main代码 和 jar 打包成一个package 上传到Master,jobGraph上附带job 相关的自定义jar信息, taskManager 执行task 前下载 相应的jar (然后由classloader)加载执行,而tf 因为其特殊性 就只能自定义op了。

Flink 新一代流计算的阶段总结 未读。