Flink Slot详解与Job ution Graph优化.docx

上传人:A*** 文档编号:142725291 上传时间:2020-08-22 格式:DOCX 页数:10 大小:586.39KB
返回 下载 相关 举报
Flink Slot详解与Job ution Graph优化.docx_第1页
第1页 / 共10页
Flink Slot详解与Job ution Graph优化.docx_第2页
第2页 / 共10页
Flink Slot详解与Job ution Graph优化.docx_第3页
第3页 / 共10页
Flink Slot详解与Job ution Graph优化.docx_第4页
第4页 / 共10页
Flink Slot详解与Job ution Graph优化.docx_第5页
第5页 / 共10页
点击查看更多>>
资源描述

《Flink Slot详解与Job ution Graph优化.docx》由会员分享,可在线阅读,更多相关《Flink Slot详解与Job ution Graph优化.docx(10页珍藏版)》请在金锄头文库上搜索。

1、Flink Slot详解与Job Execution Graph优化前言近期将Flink Job从Standalone迁移至了OnYarn,随后发现Job性能较之前有所降低:迁移前有8.3W+/S的数据消费速度,迁移到Yarn后分配同样的资源但消费速度降为7.8W+/S,且较之前的消费速度有轻微的抖动。经过原因分析和测试验证,最终采用了在保持分配给Job的资源不变的情况下将总Container数量减半、每个Container持有的资源从1C2G 1Slot变更为2C4G 2Slot的方式,使该问题得以解决。经历该问题后,发现深入理解Slot和Flink Runtime Graph是十分必要的,

2、于是撰写了这篇文章。本文内容分为两大部分,第一部分详细的分析Flink Slot与Job运行的关系,第二部详细的介绍遇到的问题和解决方案。Flink SlotFlink集群是由JobManager(JM)、TaskManager(TM)两大组件组成的,每个JM/TM都是运行在一个独立的JVM进程中。JM相当于Master,是集群的管理节点,TM相当于Worker,是集群的工作节点,每个TM最少持有1个Slot,Slot是Flink执行Job时的最小资源分配单位,在Slot中运行着具体的Task任务。对TM而言:它占用着一定数量的CPU和Memory资源,具体可通过taskmanager.num

3、berOfTaskSlots, taskmanager.heap.size来配置,实际上taskmanager.numberOfTaskSlots只是指定TM的Slot数量,并不能隔离指定数量的CPU给TM使用。在不考虑Slot Sharing(下文详述)的情况下,一个Slot内运行着一个SubTask(Task实现Runable,SubTask是一个执行Task的具体实例),所以官方建议taskmanager.numberOfTaskSlots配置的Slot数量和CPU相等或成比例。当然,我们可以借助Yarn等调度系统,用Flink On Yarn的模式来为Yarn Container分配指

4、定数量的CPU资源,以达到较严格的CPU隔离(Yarn采用Cgroup做基于时间片的资源调度,每个Container内运行着一个JM/TM实例)。而taskmanager.heap.size用来配置TM的Memory,如果一个TM有N个Slot,则每个Slot分配到的Memory大小为整个TM Memory的1/N,同一个TM内的Slots只有Memory隔离,CPU是共享的。对Job而言:一个Job所需的Slot数量大于等于Operator配置的最大Parallelism数,在保持所有Operator的slotSharingGroup一致的前提下Job所需的Slot数量与Job中Operat

5、or配置的最大Parallelism相等。关于TM/Slot之间的关系可以参考如下从官方文档截取到的三张图:图一:Flink On Yarn的Job提交过程,从图中我们可以了解到每个JM/TM实例都分属于不同的Yarn Container,且每个Container内只会有一个JM或TM实例;通过对Yarn的学习我们可以了解到,每个Container都是一个独立的进程,一台物理机可以有多个Container存在(多个进程),每个Container都持有一定数量的CPU和Memory资源,而且是资源隔离的,进程间不共享,这就可以保证同一台机器上的多个TM之间是资源隔离的(Standalone模式下

6、,同一台机器下若有多个TM,是做不到TM之间的CPU资源隔离的)。图一图二:Flink Job运行图,图中有两个TM,各自有3个Slot,2个Slot内有Task在执行,1个Slot空闲。若这两个TM在不同Container或容器上,则其占用的资源是互相隔离的。在TM内多个Slot间是各自拥有 1/3 TM的Memory,共享TM的CPU、网络(Tcp:ZK、 Akka、Netty服务等)、心跳信息、Flink结构化的数据集等。图二图三:Task Slot的内部结构图,Slot内运行着具体的Task,它是在线程中执行的Runable对象(每个虚线框代表一个线程),这些Task实例在源码中对应的

7、类是org.apache.flink.runtime.taskmanager.Task。每个Task都是由一组Operators Chaining在一起的工作集合,Flink Job的执行过程可看作一张DAG图,Task是DAG图上的顶点(Vertex),顶点之间通过数据传递方式相互链接构成整个Job的Execution Graph。图三Operator ChainOperator Chain是指将Job中的Operators按照一定策略(例如:single output operator可以chain在一起)链接起来并放置在一个Task线程中执行。Operator Chain默认开启,可通过

8、StreamExecutionEnvironment.disableOperatorChaining()关闭,Flink Operator类似Storm中的Bolt,在Strom中上游Bolt到下游会经过网络上的数据传递,而Flink的Operator Chain将多个Operator链接到一起执行,减少了数据传递/线程切换等环节,降低系统开销的同时增加了资源利用率和Job性能。实际开发过程中需要开发者了解这些原理,并能合理分配Memory和CPU给到每个Task线程。注: 【一个需要注意的地方】Chained的Operators之间的数据传递默认需要经过数据的拷贝(例如:kryo.copy(

9、.)),将上游Operator的输出序列化出一个新对象并传递给下游Operator,可以通过ExecutionConfig.enableObjectReuse()开启对象重用,这样就关闭了这层copy操作,可以减少对象序列化开销和GC压力等,具体源码可阅读org.apache.flink.streaming.runtime.tasks.OperatorChain与org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。官方建议开发人员在完全了解reuse内部机制后才使用该功能,冒然使用可能会给程序

10、带来bug。Operator Chain效果可参考如下官方文档截图:图四:图的上半部分是StreamGraph视角,有Task类别无并行度,如图:Job Runtime时有三种类型的Task,分别是Source-Map、keyBy/window/apply、Sink,其中Source-Map是Source()和Map()chaining在一起的Task;图的下半部分是一个Job Runtime期的实际状态,Job最大的并行度为2,有5个SubTask(即5个执行线程)。若没有Operator Chain,则Source()和Map()分属不同的Thread,Task线程数会增加到7,线程切换和

11、数据传递开销等较之前有所增加,处理延迟和性能会较之前差。补充:在slotSharingGroup用默认或相同组名时,当前Job运行需2个Slot(与Job最大Parallelism相等)。图四Slot SharingSlot Sharing是指,来自同一个Job且拥有相同slotSharingGroup(默认:default)名称的不同Task的SubTask之间可以共享一个Slot,这使得一个Slot有机会持有Job的一整条Pipeline,这也是上文提到的在默认slotSharing的条件下Job启动所需的Slot数和Job中Operator的最大parallelism相等的原因。通过Sl

12、ot Sharing机制可以更进一步提高Job运行性能,在Slot数不变的情况下增加了Operator可设置的最大的并行度,让类似window这种消耗资源的Task以最大的并行度分布在不同TM上,同时像map、filter这种较简单的操作也不会独占Slot资源,降低资源浪费的可能性。具体Slot Sharing效果可参考如下官方文档截图:图五:图的左下角是一个soure-map-reduce模型的Job,source和map是4 parallelism,reduce是3 parallelism,总计11个SubTask;这个Job最大Parallelism是4,所以将这个Job发布到左侧上面的

13、两个TM上时得到图右侧的运行图,一共占用四个Slot,有三个Slot拥有完整的source-map-reduce模型的Pipeline,如右侧图所示;注:map的结果会shuffle到reduce端,右侧图的箭头只是说Slot内数据Pipline,没画出Job的数据shuffle过程。图五图六:图中包含source-map6 parallelism、keyBy/window/apply6 parallelism、sink1 parallelism三种Task,总计占用了6个Slot;由左向右开始第一个slot内部运行着3个SubTask3 Thread,持有Job的一条完整pipeline;剩

14、下5个Slot内分别运行着2个SubTask2 Thread,数据最终通过网络传递给Sink完成数据处理。图六Operator Chain & Slot Sharing APIFlink在默认情况下有策略对Job进行Operator Chain 和 Slot Sharing的控制,比如:将并行度相同且连续的SingleOutputStreamOperator操作chain在一起(chain的条件较苛刻,不止单一输出这一条,具体可阅读org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(.)),Jo

15、b的所有Task都采用名为default的slotSharingGroup做Slot Sharing。但在实际的需求场景中,我们可能会遇到需人为干预Job的Operator Chain 或 Slot Sharing策略的情况,本段就重点关注下用于改变默认Chain 和 Sharing策略的API。 StreamExecutionEnvironment.disableOperatorChaining():关闭整个Job的OperatorChain,每个Operator独自占有一个Task,如上图四所描述的Job,如果disableOperatorChaining则source-map会拆开为so

16、urce(),map()两种Task,Job实际的Task数会增加到7。这个设置会降低Job性能,在非生产环境的测试或profiling时可以借助以更好分析问题,实际生产过程中不建议使用。 someStream.filter(.).map(.).startNewChain().map():startNewChain()是指从当前Operatormap开始一个新的chain,即:两个map会chaining在一起而filter不会(因为startNewChain的存在使得第一次map与filter断开了chain)。 someStream.map(.).disableChaining():disableChaining()是指当前Operatormap禁用OperatorChain,即:Operatormap会独自占用一个Task。 someStream.map(.).

展开阅读全文
相关资源
正为您匹配相似的精品文档
相关搜索

最新文档


当前位置:首页 > IT计算机/网络 > 其它相关文档

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号