Flink

7 minute read

https://juejin.cn/post/6844904014526545934

flink vs spark streaming 对比? Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型

架构不同 flink的角色有jobmanager/taskmanager/slot,同时flink根据用户用户提交的代码生成streamgraph,进过优化 生成jobgraph,jobgraph也是flink ui所看到的拓扑图,jobmanager会根据jobgraph生成execution graph, execution graph是最核心的数据结构,job manager会根据execution graph调度任务

时间机制 flink支持多种时间类型,包括处理时间、进入时间、事件时间,同时支持watermark处理滞后数据

flink集群角色和作用是什么? jobmanager: 集群中的master,负责接收job、协调检查点、故障恢复等,同时管理flink节点taskmanager taskmanager: 实际干活的,每个taskmanager负责管理其所在节点的资源信息,如内存、磁盘、网络,在启动的时候向jobmanager汇报 client: flink提交程序的客户端,当提交一个flink程序时,首先创建一个client,该client首先对代码进行预处理,然后提交给jobmanager

flink ha机制是什么? standalone模式:依赖zk实现ha 在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。 当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。 yarn cluster模式: 依靠 Yarn 本身来对 JobManager 做 HA,完全依靠着 Yarn 中的 ResourceManager实现故障恢复。

介绍一下fink的slot,它的作用是什么? taskmanager是实际负责计算的worker,taskmanager是一个jvm进程,并会以独立的线程来执行一个task或多个subtask。 为了控制一个taskmanager能接收线程的最大数量,flink提出slot的概念来作限制。slot会均分jvm的内存,避免不同slot的线程任务相互竞争内存资源,实现内存隔离,但不会实现cpu隔离。

flink的槽slot和并行度parallelism有什么关系? flink程序最大并行度等于slot的最大可用数量,即taskmanager的数量乘以每个taskmanager上slot的数量

flink之所以有这么高的吞吐量的原因,你认为有什么? 为了更高效地分布式执行,flink会尽可能的将多并行度的上下游算子的实例链接在一起,也就是尽可能的将上下游算子放在同一个slot中执行,这样做的好处是 减少线程之间的切换,减少数据的序列化和反序列化,减少数据的io交互,减少延迟的同时提高整体的吞吐量。

flink形成operator chain算子链的条件是什么? 需要基本的一个保证是算子链不会改变拓扑结构。 1、上下游的并行度一致 2、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入) 3、上下游节点都在同一个 slot group 中(下面会解释 slot group) 4、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) 5、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD) 6、两个节点间数据分区方式是 forward 7、用户没有禁用 chain

谈下项目中使用的flink常用算子及上下游算子间的分区策略? 分区策略是指一条数据如何从上游算子发给下游算子,flink默认实现8种分区策略,在flink ui上也可以看到 注意:flink改变并行度,默认是reblance的分区策略,这种可能会带来数据乱序执行的问题,线上环境出过类似问题,印象深刻

globalpartitioner: 数据会被分发到下游算子的第一个实例执行 shufflepartitioner: 数据会被随机分发下游算子的一个实例中执行 reblancepartitioner: 数据会被循环发送到下游的每一个算子执行 rescalepartitioner: 根据上下游算子的并行度,循环输出到下游算子 broadcastpartitioner: 广播方式发往下游每个算子 forwardpartitioner: 数据会被分发到下游本地算子执行,要求上下游算子并行度一致 keygroupstreampartitioner: 将数据按key的hash值发往下游算子执行 custompartitionerwrapper: 自定义分区器,实现partitioner接口

map:常用于转换操作 reblance分区 flatmap: 一对多输出 filter: 过滤器 keyby: hash window: union:

startNewChain 指示从该operator开始一个新的chain(与前面截断,不会被chain到前面) disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起) 默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许slot共享有以下两点好处: Flink 集群所需的task slots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。 如果有slot共享,将keyAggregation/sink的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。 SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。相应的,还有一个 CoLocationGroup 类用来强制将 subtasks 放到同一个 slot 中。 CoLocationGroup主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。

每个tm上有多少个slot就变得很重要,以内: https://clay4444.github.io/posts/5c06dc97/ 每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。 每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。 slot共享也变得很重要,因为: Flink 集群所需的task slots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将keyAggregation/sink的2个并行度增加到6个,能充分利用slot资源,同时保证每个 TaskManager 能平均分配到重的 subtasks(看图)

flink的分布式缓存是什么? 依托于hdfs,把文件放到hdfs,每一个taskmanager中的任务去hdfs拉取文件。

什么是广播变量? 首先广播变量是只读的,不允许修改 Broadcast是一份存储在TaskManager内存中的只读的缓存数据 1、从clinet端将一份需要反复使用的数据封装到广播变量中,分发到每个TaskManager的内存中保存 2、TaskManager中的所有Slot所管理的线程在执行task的时候如果需要用到该变量就从TaskManager的内存中读取数据,达到数据共享的效果

flink中的窗口是什么?有哪些窗口函数? WindowAssigner负责将输入的数据分配到一个或多个窗口,Flink内置了许多WindowAssigner,这些WindowAssigner可以满足大部分的使用场景。比如tumbling windows, sliding windows, session windows , global windows。 如果这些内置的WindowAssigner不能满足你的需求,可以通过继承WindowAssigner类实现自定义的WindowAssigner。

flink支持两种划分窗口的方式,time和count tumbling-window 滚动窗口 sliding-window 滑动窗口 time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5)) time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3)) count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5) count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

通过窗口函数,在每个窗口上对窗口内的数据进行处理。窗口函数主要分为两种,一种是增量计算,如reduce和aggregate,一种是全量计算,如process。 增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。 全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。 reduceFunction:接受两个相同类型的输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。 aggregateFunction:增量计算窗口函数,也只保存了一个中间状态数据,但AggregateFunction使用起来更复杂一些 ProcessWindowFunction要对窗口内的全量数据都缓存。在Flink所有API中,process算子以及其对应的函数是最底层的实现,使用这些函数能够访问一些更加底层的数据,比如,直接操作状态等。

ProcessWindowFunction相比AggregateFunction和ReduceFunction的应用场景更广,能解决的问题也更复杂。 但ProcessWindowFunction需要将窗口中所有元素作为状态存储起来,这将占用大量的存储资源,尤其是在数据量大窗口多的场景下,使用不慎可能导致整个程序宕机。

ProcessWindowFunction与增量计算相结合 当我们既想访问窗口里的元数据,又不想缓存窗口里的所有数据时,可以将ProcessWindowFunction与增量计算函数相reduce和aggregate结合。 对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。

flink中的checkpoint是怎么做的? flink在计算的过程中需要存储中间状态来避免数据丢失和故障恢复,flink提供三种状态存储方式,MemoryStateBackend、FsStateBackend、RocksDBStateBackend

flink中的时间概念有哪几种类型?watermark有什么作用? 如果以 EventTime 为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime。 如果以 IngesingtTime 为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的systemTime为准。 如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的systemTime 为准。 Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。一般来讲Watermark经常和Window一起被用来处理乱序事件。

flink是如何高效进行数据交换? 数据需要在不同的task中进行交换,整个数据交换是有 TaskManager 负责的,TaskManager 的网络组件首先从缓冲buffer中收集records,然后再发送。 Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。

flink如何实现容错? 依靠checkpoint和state机制,Flink 中提供了三种形式的存储后端用来存储状态,分别是 MemoryStateBackend, FsStateBackend 和 RocksDBStateBackend flink在保存checkpoint的时候,需要保存算子的状态既包括算子状态,又包括系统状态,例如缓冲区buffer。 Flink 实现了一个轻量级的分布式快照机制,其核心点在于 Barrier。 Coordinator 在需要触发检查点的时候要求数据源注入向数据流中注入 barrie, barrier 和正常的数据流中的消息一起向前流动,相当于将数据流中的消息切分到了不同的检查点中。当一个 operator 从它所有的 input channel 中都收到了 barrier, 则会触发当前 operator 的快照操作,并向其下游 channel 中发射 barrier。当所有的 sink 都反馈收到了 barrier 后,则当前检查点创建完毕。

一个关键的问题在于,一些 operator 拥有多个 input channel,它往往不会同时从这些 channel 中接收到 barrier。 如果 Operator 继续处理 barrier 先到达的 channel 中的消息,那么在所有 channel 的 barrier 都到达时,operator 就会处于一种混杂的状态。 在这种情况下,Flink 采用对齐操作来保证 Exactly Once 特性。Operator 会阻塞 barrier 先到达的 channel,通常是将其流入的消息放入缓冲区中,待收到所有 input channel 的 barrier 后,进行快照操作,释放被阻塞的 channel,并向下游发射 barrier。

对齐操作会对流处理造成延时,但通常不会特别明显。如果应用对一致性要求比较宽泛的话,那么也可以选择跳过对齐操作。这意味着快照中会包含一些属于下一个检查点的数据,这样就不能保证 Exactly Once 特性,而只能降级为 At Least Once

flink如何实现内存管理? Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。

理论上Flink的内存管理分为三部分: Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改 Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。 User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。

flink如何实现序列化和反序列化? Apache Flink摒弃了Java原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。

flink window出现数据倾斜,怎么解决? 在业务上规避这类问题 例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。 Key的设计上 把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。 参数设置 Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。 业务上生成不同key

flink任务延迟高,怎么解决这个问题? 在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。 资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。 作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。

flink如何处理反压? Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。 Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。 二者最大的区别是Flink是逐级反压,而Storm是直接从源头降速。

trigger函数有哪几种类型? 触发器(Trigger)决定了何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理。 增量计算窗口函数对每个新流入的数据直接进行聚合,Trigger决定了在窗口结束时将聚合结果发送出去; 全量计算窗口函数需要将窗口内的元素缓存,Trigger决定了在窗口结束时对所有元素进行计算然后将结果发送出去。 当满足某个条件,Trigger会返回一个名为TriggerResult的结果:

CONTINUE:什么都不做。 FIRE:启动计算并将结果发送给下游,不清理窗口数据。 PURGE:清理窗口数据但不执行计算。 FIRE_AND_PURGE:启动计算,发送结果然后清理窗口数据。

WindowAssigner都有一个默认的Trigger。比如基于Event Time的窗口会有一个EventTimeTrigger,每当窗口的Watermark时间戳到达窗口的结束时间,Trigger会发送FIRE。 此外,ProcessingTimeTrigger对应Processing Time窗口,CountTrigger对应Count-based窗口。

Updated:

Leave a comment