flink并行度

wangfengxia wangfengxia     2022-12-30     580

关键词:

并行执行

本节介绍如何在Flink中配置程序的并行执行。FLink程序由多个任务(转换/操作符、数据源和sinks)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行性。

 

如果要使用保存点,还应该考虑设置最大并行性(或最大并行性)。当从保存点还原时,可以改变特定运算符或整个程序的并行性,并且该设置指定并行性的上限。这是必需的,因为FLINK内部将状态划分为key-groups,并且我们不能拥有+INF的key-group数,因为这将对性能有害。

Flink中人物的并行度可以从多个不同层面设置:

 

 

 

 

 

操作算子层

操作算子,数据源,数据接收器等这些并行度都可以通过调用他们的setParallelism()方法设置。例如:

val env = StreamExecutionEnvironment.getExecutionEnvironment

 

val text = [...]

val wordCounts = text

? ?.flatMap _.split(" ") map (_, 1)

? ?.keyBy(0)

? ?.timeWindow(Time.seconds(5))

? ?.sum(1).setParallelism(5)

wordCounts.print()

 

env.execute("Word Count Example")

执行环境层面

flink程序执行需要执行环境上下文。执行环境为其要执行的操作算子,数据源,数据sinks都是设置了默认的并行度。执行环境的并行度可以通过操作算子显示指定并行度来覆盖掉。

默认的执行环境并行度可以通过调用setParallelism()来设置。例如,操作算子,数据源,数据接收器,并行度都设置为3,那么在执行环境层面,设置方式如下:

 

 

 

 

 

 

 

 

 

 

客户端层

在提交job 到flink的时候,在客户端侧也可以设置flink的并行度。客户端即可以是java工程,也可以是scala工程。Flink的Command-line Interface (CLI)就是这样一种客户端。

在客户端侧flink可以通过-p参数来设置并行度。例如:

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/bin/flink run -p 10 https://blog.csdn.net/rlnLo2pNEfx9c/article/examples/*WordCount-java*.jar

在java/scala客户端,并行度设置方式如下:

 

 

 

 

 

 

 

 

 

 

系统层面

系统层面的并行度设置,会针对所有的执行环境生效,可以通过parallelism.default,属性在conf/flink-conf.yaml文件中设置。

 

设置最大并行度

设置最大并行度,实际上调用的方法是setMaxParallelism(),其调用位置和setParallelism()一样。

默认的最大并行度是近似于operatorParallelism + (operatorParallelism / 2),下限是127,上线是32768.

 

值得注意的是将最大的并行的设置为超级大的数可能会对性能造成不利的影响,雅思6.5因为一些状态后端是必须要保存内部数据结构的,这个数据结构跟key-group数量相匹配(这是可重定状态的内部实现机制)。

配置taskmanagerslot

flink通过将项目分成tasks,来实现并行的执行项目,划分的tasks会被发到slot去处理。

集群中Flink的taskmanager提供处理slot。Slots数量最合适的是跟taskmanager的cores数量成正比。当然,taskmanager.numberOfTaskSlots的推荐值就是cpu核心的数目。

当启动一个任务的时候,我们可以为其提供默认的slot数目,其实也即是flink工程的并行度,设置方式在上面已经有详细介绍。

 

技术分享图片

技术分享图片

 

推荐阅读

Flink:动态表上的连续查询

技术分享图片


文章来源:https://blog.csdn.net/rlnLo2pNEfx9c/article/details/80809738

flink实战系列如何给flink任务设置合理的并行度?

如何给Flink任务设置合理的并行度?背景介绍最近看到很多朋友都在问这个问题,当我在开发Flink实时计算任务的时候,如何给每个算子设置合理的并行度呢?如果设置多了可能会出现资源浪费的情况,如果设置少了任务可能会... 查看详情

flink并行度可以有如下几种指定方式

...atorLevel(算子级别)(可以使用)一个算子、数据源和sink的并行度可以通过调用setParallelism()方法来指定2.ExecutionEnvironmentLevel(Env级别)(可以使用)执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来... 查看详情

flink调优之压测任务的合理并行度

压测合理并行度的方法:①获得高峰期的qps,如每秒5w条②消费该高峰期的数据,达到反压状态后查看每秒处理的数据量y,就是单并行度的处理上限③x除以y,增加一点富余:乘以1.2,就是合理的并行度。在flink中,设置并行度... 查看详情

1.21.flinkslot和并行度(parallelism)flink的并行度由什么决定的?flink的task是什么?slot和parallelism(代码片段)

1.21.FlinkSlot和并行度(parallelism)1.21.1.Flink的并行度由什么决定的?1.21.2.Flink的task是什么?1.21.3.slot和parallelism1.21.3.1.slot是指taskmanager的并发执行能力1.21.3.2.parallelism是可配置、可指定的1.21.4.slot和parallelism总结1 查看详情

flinkflink中keygroups与最大并行度(代码片段)

1.概述转载:Flink中KeyGroups与最大并行度相关文章:【Flink】并行度设置导致的负载倾斜KeyGroups不均衡计算最大并行度【Flink】Flinkkey应该分配到哪个KeyGroup以及KeyGroup分配在哪个subtask2.何为KeyGroupsKey-Groups是Flink对Key进行分组... 查看详情

合理评估flink的并行度

参考技术AFlink任务并行度合理行一般根据峰值流量进行压测评估,并且根据集群负载情况留一定量的buffer资源1.⭐如果数据源已经存在,则可以直接消费进行测试2.⭐如果数据源不存在,需要自行造压测数据进行测试1.n... 查看详情

flink自定义非并行的source,即source的并行度只为1(代码片段)

自定义非并行的Source,即Source的并行度只为1如果实现了SourceFunction接口或继承了RichSourceFunction抽象类,得到的DataStream为只有一个并行;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStre... 查看详情

flink并行度设置导致的负载倾斜keygroups不均衡计算最大并行度(代码片段)

1.概述转载:使用Flink前需要知道的10个『陷阱』我们从一个简单的问题开始:在FlinkUI中调查某个作业的子任务时,关于每个子任务处理的数据量,你可能会遇到如下这种奇怪的情况。每个子任务的工作负载并不均... 查看详情

25.flink监控什么是metricsmetrics分类flink性能优化的方法合理调整并行度合理调整并行度flink内存管理sparkvsflink时间机制容错机制等(代码片段)

...能优化26.1.复用对象26.2.数据倾斜26.3.异步IO26.4.合理调整并行度27.Flink内存管理28.SparkVSFlink28.1.应用场景28.2.API28.3.核心角色/流程原理28.3.1.spark28.3.2.Flink28.4.时间机制28.5.容错机制28.6.窗口28.7.整合Kafka28.8.其他的28.9.单独补充:流式计... 查看详情

25.flink监控什么是metricsmetrics分类flink性能优化的方法合理调整并行度合理调整并行度flink内存管理sparkvsflink时间机制容错机制等(代码片段)

...能优化26.1.复用对象26.2.数据倾斜26.3.异步IO26.4.合理调整并行度27.Flink内存管理28.SparkVSFlink28.1.应用场景28.2.API28.3.核心角色/流程原理28.3.1.spark28.3.2.Flink28.4.时间机制28.5.容错机制28.6.窗口28.7.整合Kafka28.8.其他的28.9.单独补充:流式计... 查看详情

flink并行度

...型,可以控制需要的计算资源。在flink整个runtime的模型中并行度是一个很重要的概念,通过设置并行度可以为认为分配合理的计算资源,做到资源的合理配置。整个flink的架构简单的说是中心控制(jobManager)+多点分布执行(taskM... 查看详情

04-flink-1.10.1-流处理wordcount代码里控制并行度(代码片段)

...应该是先取单词的hash值然后用对处理数据的线程数(并行度)取模/取余的到的索引hello总是会进 查看详情

04-flink-1.10.1-流处理wordcount代码里控制并行度(代码片段)

...应该是先取单词的hash值然后用对处理数据的线程数(并行度)取模/取余的到的索引hello总是会进 查看详情

flinkrescalestate调整有状态算子的并行度

...为了增大或较小输入数据的速率,需要灵活地调整算子的并行度。对于无状态算子而言,并行度的调整没有任何问题,但更改有状态算子的并行度显然就没那么简单了,因为它们的状态需要重新分区并分配给更多或更少的并行任... 查看详情

flink架构浅析&并行度slot的关系

...r中通过管理多个TaskSlot资源池进行对资源进行有效管理。并行度案例分析Flink集群中有3个TaskManager节点,每个TaskManager的Slot数量为3本质:Slot其实就是集 查看详情

flink内幕-作业调度--flink1.13

...ger有一到多个taskslot,每个taskslot可以运行一条由多个并行task组成的流水线。这样一条流水线由多个连续的task组成,比如并行度为n的MapFunction和并行度为n的ReduceFunct 查看详情

flink内幕-作业调度--flink1.13

...ger有一到多个taskslot,每个taskslot可以运行一条由多个并行task组成的流水线。这样一条流水线由多个连续的task组成,比如并行度为n的MapFunction和并行度为n的ReduceFunct 查看详情

flink调优_yarn动态分配cpu资源

...默认容器资源为最小核心数,即一个cpu,所以我们要根据并行度去调整分配的cpu资源程序如下:指定并行度为5,指定每个TM的slot数为2-p5\\ -Dtaskmanager.numberOfTaskSlots=2\\所以虽然配置了2个slot(并发度为2),但是两个task不能同时运... 查看详情