Flink 入门指北

名称解释

Apache Flink 是一个开源流处理框架,用于在无界有界数据流上进行有状态计算。

何为有界无界?

有界的数据就像罐装水,水的体积是确定的,而无界的数据就像自来水,水是源源不断的。有界的数据可以用批处理的方式来计算,无界的数据需要用流处理的方式来处理。

流计算和批计算的区别

  1. 数据时效性不同:流计算实时、低延迟,批计算非实时、高延迟。
  2. 数据特征不同:流计算的数据一般是动态的、没有边界的,而批处理的数据一般是静态数据。
  3. 应用场景不同:流计算应用在实时场景,时效性要求比较高的场景,如实时推荐、业务监控等。批计算应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。
  4. 运行方式不同:流计算的任务是持续进行的,批计算的任务则一次性完成。

时间概念

  1. Event Time:事件时间(通常是数据的最原始的创建时间),Event Time 是必须是存在于事件里面的数据。使用 Flink SQL 时,EventTime 时间属性必须在源表 DDL 中声明,可以将源表中的某一字段声明成 Event Time。
  2. Processing Time:系统对事件进行处理的本地系统时间。它既不需要从数据里面获取时间,也不需要生成 watermark。

时间概念

何为状态?

当前执行的代码需要依赖之前的数据才能准确执行,之前数据称为状态。

状态后端

Flink 有三种状态后端,分为 MemoryStateBackendFsStateBackendRocksDBStateBackend

  1. MemoryStateBackend 主要用于测试,内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在 TaskManager 的 JVM 堆上,而将 checkpoint 存储在 JobManager 的内存中,特点是快速、低延迟,但不稳定。
  2. FsStateBackend 可用于生产,分钟级别的窗口计算,将 checkpoint 存到远程的持久化文件系统 (FileSystem) 上,而对于本地状态跟 MemoryStateBackend 一样,也会存在 TaskManager 的 JVM 堆上,同时拥有内存级的本地访问速度和更好的容错保证。
  3. RocksDBStateBackend 可用于生产,超大状态的存储,将所有状态序列化后,存入本地的 RocksDB 中存储。

状态容错

checkpoint:按照周期性的全域一致快照集合。
savepoint :job被 cancel 时,指定需要将当前全域一致快照进行存储

何为窗口?

窗口是流计算的核心,在处理流计算时,我们一般把数据流分成很多小块来处理,每一块就是一个窗口,比如按时间来划分就有 Time Window,按数据来划分就有 Count Window

Time Window

Time Window 是按时间驱动的,有 Sliding WindowsTumbling WindowsSession WindowsGlobal Windows

  1. Sliding Windows 滑动窗口:是以一个固定的步长 (Slide)不断向前滑动,数据可以被重复计算,取决于 Size(窗口大小) 和 Slide Time(滑动间隔)。滑动窗口

  2. Tumbling Windows 滚动窗口:它的窗口之间不重叠,且窗口大小固定,是一种特殊的 Sliding Windows(Window Size = Window Slide)。滚动窗口

  3. Session Windows 会话窗口:是一种 Window Size 可变的,根据 Session Gap 切分不同的窗口。当一个窗口在大于 Session Gap 的时间内没有接收到新数据时,窗口关闭。会话窗口

  4. Global Windows 全局窗口:这个窗口需要指定自定义的触发器才有用,否则不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束点。全局窗口

Count Window

Count Window 是按数据驱动的,分为滑动计数窗口和滚动计数窗口。

Window Assinger

窗口分配器,将数据流中的元素分配到对应的窗口。

Window Trigger

触发器,决定了何时启动 Window Function 来处理窗口中的数据以及何时将窗口内的数据清理。

Flink 内置的 Window Trigger 有以下这些:

  1. ProcessingTimeTrigger:一次触发,基于 ProcessTime 触发,当机器时间大于窗口结束时间触发。
  2. EventTimeTrigger:一次触发,基于 EventTime,当 Watermark 大于窗口结束时间触发。
  3. ContinuousProcessionTimeTrigger:多次触发,基于 processTime 的固定时间间隔触发。
  4. ContinuousEventTimeTrigger:多次触发,基于 EventTime 的固定时间间隔触发。
  5. CountTrigger:多次触发,基于 Element 的固定条数触发。
  6. DeltaTrigger:多次触发,基于本次 Element 和上次触发 Trigger 的 Element 做 Delta 计算,超过指定的 Threshold 后触发。
  7. PurgingTrigger:对 Trigger 的封装实现,用于 Trigger 触发后额外清理中间状态数据。

Window Evictors

数据剔除器。

Flink 内置的 Window Evictors 有以下这些:

  1. CountEvictor:保留一定数目的元素,多余的元素按照从前到后的顺序先后清理。
  2. TimeEvictor:保留一个时间段的元素,早于这个时间段的元素会被清理。
  3. DeltaEvictor:窗口计算时,最近一条 Element 和其他 Element 做 Delta 计算,仅保留 Delta 结果在指定 Threshold 内的 Element。

Window Function

当满足窗口触发条件后,对窗口内的数据使用窗口处理函数进行处理。

  1. ReduceFunction (增量)
  2. AggregateFunction (增量)
  3. FoldFunction (增量)
  4. ProcessWindowFunction (全量)

watermark

watermark 是基于已经收集的消息来估算是否还有消息未到达,本质上是一个时间戳。该时间戳反映的是事件发生的时间,而不是事件处理的时间。watermark 在一定程度上解决了事件乱序问题。

watermark 设定方法分为 Punctuated WatermarkPeriodic Watermark

  1. 标点水位线 (Punctuated Watermark):通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。在实际的生产中 Punctuated 方式在 TPS 很高的场景下会产生大量的 Watermark 在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择 Punctuated 的方式进行 Watermark 的生成。
  2. 定期水位线 (Periodic Watermark):周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个 Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。在实际的生产中 Periodic 的方式必须结合时间和积累条数两个维度继续周期性产生 Watermark,否则在极端情况下会有很大的延时。

有了 watermark 自然就有了迟到事件,虽说 watermark 表明着早于它的事件不应该再出现,但是接收到 watermark 以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了 watermark 的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

  1. 将迟到事件视为错误消息并丢弃默认
  2. 重新激活已经关闭的窗口并重新计算以修正结果(Allowed Lateness
  3. 将迟到事件收集起来另外处理(Side Output

总结

  1. Watermark = Max EventTime - Late Threshold
  2. Late Threshold 越高,数据处理时延越高
  3. 启发式更新
  4. 解决一段时间的乱序事件
  5. 窗口触发条件: Current Watermark > Window EndTime
  6. Watermark 的主要目的是告诉窗口不再会有比当前 Watermark 更晚的数据到达

Planner

Flink PlannerBlink Planner

两者区别:

  1. Blink 将批处理作业视为流处理的一种特例,批处理作业不会转化为 DataSet 程序而是转化为 DataStream 程序,流作业也一样。
  2. Blink 计划器不支持 BatchTableSource, 而是使用有界的 StreamTableSource 来代替。
  3. 基于字符串的键值配置选项仅在 Blink 计划器中使用。
  4. PlannerConfig 在两种计划器中实现(CalciteConfig)是不同的。
  5. Blink 计划器会将多 sink (multiple-sinks) 优化成一张有向无环图 (DAG),TableEnvironment 都支持该特性。旧计划器总是将每个 sink 都优化成一个新的 DAG,且所有的图相互独立。
  6. 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。

Flink 架构

架构图

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。

JobManager

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。

这个进程由三个不同的组件组成:

  1. ResourceManager:ResourceManager 负责 Flink 集群中的资源提供、回收、分配,它管理 task slots。Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
  2. Dispatcher:Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它运行 Flink WebUI 用来提供作业执行信息。
  3. JobMaster:JobMaster 负责管理单个 JobGraph 的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby。

TaskManager

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。一个 task slot 中可以执行多个算子。

每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。

对于分布式执行,Flink 将算子的 subtasks 链接成 tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。链行为是可以配置的。

集群

生命周期:客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。

资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争,例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败。类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。

其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响,就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。

生命周期:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。

资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。

其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main() 方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口 ApplicationClusterEntryPoint 负责调用 main() 方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。

资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。

用例剖析

需要注意的点

数据重复问题

CDC(ChangeLog Data Capture,变更日志数据捕获)工具在正常的操作环境下是能以 exactly-once 的语义投递每条变更事件,但是在有故障发生时, CDC 只能保证 at-least-once 的投递语义,这种情况就会导致 Flink query 运行得到的错误的结果或者非预期的异常,处理方法是设置作业参数 table.exec.source.cdc-events-duplicate=true,且同时要在该 source 上定义好 PRIMARY KEY

数据消费问题

消费同个 kafka topicsink sql 要放在同个 paragraph 里面,且要加上 runAsOne=true 的参数,这样做的好处是从 kafka topic 消费一条数据会复制给同个 paragraph 里面的多个任务。

flink job overview

多流 join

尽量减少多流 join

Q.E.D.


知识的价值不在于占有,而在于使用