flink执行时之流处理程序生成流图

zhchoutai zhchoutai     2022-09-15     175

关键词:

流处理程序生成流图

DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。

什么是流图

流图(StreamGraph)是表示流处理程序拓扑的数据结构。它封装了生成作业图(JobGraph)的必要信息。它的类继承关系例如以下图所看到的:

技术分享

当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口。

Flink效仿了传统的关系型数据库在运行SQL时生成运行计划并对其进行优化的思路。

FlinkPlan是Flink生成运行计划的基接口。定义在Flink优化器模块中。流处理程序相应的计划是StreamingPlan,可是当前针对流处理程序没有进行优化,因此这个类可看作是一个预留设计。

一个简单的实现“word count”的流处理程序,其StreamGraph的形象化表演示样例如以下图:

技术分享

Flink官方提供了一个计划可视化器来图形化运行计划,该计划可视化器基于Flink API所生成的计划的JSON格式表示绘制图形。可是须要注意的是,计划的JSON形式表示缺失了非常多属性以及部分节点(比方虚拟节点等);

上面的图是由“节点”和“边”组成的。节点在Flink中相应的数据结构是StreamNode,而边相应的数据结构是StreamEdge,StreamNode和StreamEdge之间有着双向的依赖关系。StreamEdge包括了其连接的源节点sourceVertex和目的节点targetVertex:

技术分享

而StreamNode中包括了与其连接的入边集合inEdges和出边集合outEdges:

技术分享

StreamEdge和StreamNode都有唯一的编号进行标识,可是各自编号的生成规则并不同样。

StreamNode的编号id的生成是通过调用StreamTransformation的静态方法getNewNodeId获得的,事实上现是一个静态计数器:

protected static Integer idCounter = 0;
public static int getNewNodeId() {   
    idCounter++;   
    return idCounter;
}

StreamEdge的编号edgeId是字符串类型,其生成的规则为:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames 
                + "_" + outputPartitioner;

它是由多个段连接起来的,语义的文字表述例如以下:

源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器

edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其它实际意义。

StreamNode是表示流处理中算子的数据结构。source和sink在StreamGraph中也是以StreamNode表示,它们也是一种算子,仅仅是由于它们是流的输入和输出因而有特定的称呼。

StreamNode除了存储了输入端和输出端的StreamEdge集合,还封装了算子的其它关键属性,比方其并行度、分区的键信息、输入与输出类型的序列化器等。

从直观上来看你已经知道了StreamNode和StreamEdge是StreamGraph的重要组成部分,可是为了生成JobGraph,StreamGraph非常显然必须得包括很多其它的内容。

总结一下,StreamGraph中包括的属性可分为三大类:

  • 流处理程序的运行配置。
  • 流处理程序拓扑中包括的节点和边的信息。
  • 迭代相关的信息;

当然环绕这些属性的方法非常多。比方增加边和节点,创建迭代的source/sink等。

当中的一个关键方法getJobGraph将用于生成JobGraph:

public JobGraph getJobGraph() {     
    if (isIterative() && checkpointConfig.isCheckpointingEnabled() 
        && !checkpointConfig.isForceCheckpointing()) {      
        throw new UnsupportedOperationException(            
            "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "                  
            + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "                  
            + "
The user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");   
    }   
    StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);   
    return jobgraphGenerator.createJobGraph();
}

从上面的代码段也可见。当流处理程序中包括迭代逻辑时。检查点功能临时不被支持,在异常信息中Flink阐述了缘由:在迭代作业中无法保证“恰好一次”的语义。

流处理程序依赖StreamingJobGraphGenerator来生成JobGraph,至于怎样生成。兴许会进行剖析。

生成流图的源代码分析

了解了什么是流图(StreamGraph)之后。我们来分析它是怎样生成的。流图的生成是通过StreamExecutionEnvironment的getStreamGraph实例方法触发的:

public StreamGraph getStreamGraph() {   
    if (transformations.size() <= 0) {      
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");   
    }   
    return StreamGraphGenerator.generate(this, transformations);
}

从代码段中可见,StreamGraph的生成依赖于一个名为transformations的集合对象,它是环境对象所收集到的全部的转换对象的集合,该集合中存储着一个流处理程序中全部的转换操作相应的StreamTransformation对象。

每当在DataStream对象上调用transform方法或者调用已经被实现了的一些内置的转换函数(如map、filter等。这些转换函数在内部也调用了transform方法)。这些调用都会使得其相应的转换对象被增加到transformations集合中去。StreamTransformation表示创建DataStream对象的转换,流处理程序中存在多种DataStream,每种底层都相应着一个StreamTransformation。DataStream持有运行环境对象的引用。当调用transform方法时,它会调用运行环境对象的addOperator方法,将特定的StreamTransformation对象增加到transformations集合中去。这就是transformations集合中元素的来源。

DataStream API的设计存在着多重对象的封装,我们以flatMap转换操作为例图示各种对象之间的构建关系:

技术分享

在Flink的源代码中,这些对象的命名也并非那么准确。比方上图中的SingleOutputStreamOperator事实上是一种DataStream,但却以Operator结尾,让人匪夷所思。因此较为准确的鉴定它们类型的方式是通过查看它们的继承链来进行识别。

StreamGraph的生成依赖于生成器StreamGraphGenerator,每调用一次静态方法generate才会在内部创建一个StreamGraphGenerator的实例。一个实例相应着一个StreamGraph对象。StreamGraphGenerator调用内部的实例方法generateInternal来遍历transformations集合的每一个对象:

private StreamGraph generateInternal(List<StreamTransformation<?

>> transformations) { for (StreamTransformation<?> transformation: transformations) { transform(transformation); } return streamGraph; }

在transform方法中,它枚举了Flink中每一种转换类型,并对当前传入的转换类型进行推断。然后将其分发给特定的转换方法进行转换。终于返回当前StreamGraph对象中跟该转换有关的节点编号集合。

这里我们以经常使用的单输入转换方法transformOnInputTransform为例来进行分析:

private <IN, OUT> Collection<Integer> transformOnInputTransform(
    OneInputTransformation<IN, OUT> transform) {
    //递归地对该转换的输入端进行转换   
    Collection<Integer> inputIds = transform(transform.getInput());   
    // 递归调用可能会产生反复,这里须要以转换过的对象进行检查   
    if (alreadyTransformed.containsKey(transform)) {      
        return alreadyTransformed.get(transform);   
    }

    //结合输入端相应的节点编号来推断并得出槽共享组的名称   
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);   
    //将当前算子(节点)增加到流图中
    streamGraph.addOperator(transform.getId(),         
        slotSharingGroup,         
        transform.getOperator(),         
        transform.getInputType(),         
        transform.getOutputType(),         
        transform.getName());
    //假设有键选择器。则进行设置   
    if (transform.getStateKeySelector() != null) {      
        TypeSerializer<?> keySerializer = 
            transform.getStateKeyType().createSerializer(env.getConfig());      
        streamGraph.setOneInputStateKey(transform.getId(), 
            transform.getStateKeySelector(), keySerializer);   
    }   
    streamGraph.setParallelism(transform.getId(), transform.getParallelism()); 
    //构建从当前转换相应的节点到输入转换相应的节点之间的边  
    for (Integer inputId: inputIds) {      
        streamGraph.addEdge(inputId, transform.getId(), 0);   
    }   
    //返回当前转换相应的节点编号
    return Collections.singleton(transform.getId());
}

每遍历完一个转换对象,就离构建完整的流图更近一步。不同的转换操作类型,它们为流图提供的“部件”并不全然同样。有的转换仅仅构建节点(如SourceTransformation)。有的转换除了构建节点还构建边(如SinkTransformation),有的仅仅构建虚拟节点(如PartitionTransformation、SelectTransformation等)。

关于虚拟节点。这里须要说明的是并非全部转换操作都具有实际的物理意义(即物理上相应详细的算子)。有些转换操作仅仅是逻辑概念(比如select。split,partition。union),它们不会构建真实的StreamNode对象。比方某个流处理应用相应的转换树例如以下图:

技术分享

但在运行时,其生成的StreamGraph却是以下这样的形式:

技术分享

从图中能够看到,转换树中相应的一些逻辑操作在StreamGraph中并不存在,Flink将这些逻辑转换操作转换成了虚拟节点。它们的信息会被绑定到从source到map转换的边上。

Flink当前对于流处理的程序是不作优化的。所以StreamGraph就是它的运行计划。

你能够通过Flink提供的运行计划的可视化器将StreamGraph所表述的信息以图形化的方式展示出来。就像上文我们展示的那幅图一样。那么我们怎样查看我们自己所编写的程序的运行计划呢?事实上非常easy,我们以Flink源代码中flink-examples-streaming模块中的SocketTextStreamWordCount为例。来看一下怎样生成运行计划。

我们将SocketTextStreamWordCount最后一行代码凝视掉:

env.execute("WordCount from SocketTextStream Example");

然后将其替换成以下这句:

System.out.println(env.getExecutionPlan());

这行语句的作用是打印当前这个程序的运行计划,它将在控制台产生该运行计划的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream",
"parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2,
"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation",
"pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2,
"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink",
"contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD",
"side":"second"}]}]}

把上面这段JSON字符串拷贝到Flink的运行计划可视化器的输入框中。然后点击下方的“Draw”button,就可以生成。


微信扫码关注公众号:Apache_Flink

技术分享


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

技术分享

flink学习之流处理原理

💗💕💖今天是情人节,祝大家情人节快乐!今天我们来继续flink教程,我们今日要学习的是流处理的原理。注意一下,我这里的流处理指的不是flink的流处理,而是流处理的基础模型——Dataflow模... 查看详情

flink学习之流处理原理

💗💕💖今天是情人节,祝大家情人节快乐!今天我们来继续flink教程,我们今日要学习的是流处理的原理。注意一下,我这里的流处理指的不是flink的流处理,而是流处理的基础模型——Dataflow模... 查看详情

flink计算框架概述

...、提供测量的物理传感器、金融市场、机器的日志数据。执行模型实时处理是指当数据正在生成时连续执行的数据的处理过程。批处理是指在有限的时间内执行有限的数据的处理过程。不管采用哪种类型的执行模型来处理数据都... 查看详情

「flink」flink中的时间类型(代码片段)

...支持不同类型的时间。分为以下几种:处理时间Flink程序执行对应操作的系统时间。所有基于时间的操作(例如:时间窗口)都将使用运行相应operator的系统时间。例如:每个小时的处理时间窗口包括在系统时间范围内所有operator... 查看详情

存储过程之流程控制语句

...程序进行流程控制条件控制    IF条件:条件为真,执行    CASE条件:匹配到,执行循环控制    WHILE循环:先判断后执行    REPEAT循环:先执行后判断    LOOP循环(死循环)    LEAVE语句(离开)    ... 查看详情

flink安装部署

...为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。Flink特点1)随处部署应用与其它组件集成!flink是分布式系统,需要计算资源才可执行程序。flink可以与常见的集群资源管理器进行集成(HadoopYarn,ApacheMesos..)。可... 查看详情

flink并行度

并行执行本节介绍如何在Flink中配置程序的并行执行。FLink程序由多个任务(转换/操作符、数据源和sinks)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行性... 查看详情

apacheflink是啥?

...分布式开源数据处理框架。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。虽然,spark和storm的计算框架非常成熟,但... 查看详情

生成数据通信流图(C 程序,分析)[关闭]

】生成数据通信流图(C程序,分析)[关闭]【英文标题】:Generatedatacommunicationflowgraph(Cprogram,profiling)[closed]【发布时间】:2014-01-1018:10:42【问题描述】:我希望能够生成一个通信/调用图,其中包含有关函数之间传递的数据量的信... 查看详情

flink窗口延迟数据处理allowedlateness

...据是指数据记录元素到达算子后,本应该参与的计算已经执行完毕。在事件时间窗口算子中,如果数据记录元素到达算子时窗口分配器为其分配的窗口因为算 查看详情

flink窗口延迟数据处理allowedlateness

...据是指数据记录元素到达算子后,本应该参与的计算已经执行完毕。在事件时间窗口算子中,如果数据记录元素到达算子时窗口分配器为其分配的窗口因为算 查看详情

flink架构、原理

...ream进行计算处理,输出一个或多个结果Stream。Flink程序被执行的时候,它会被映射为StreamingDataflow。一个StreamingDataflow是由一组Stream和TransformationOperator组成,它类似于一个DAG图,在启动的时 查看详情

java基础之流程控制

...、顺序结构    顺序结构的程序语句只能被执行一次。如果您想要同样的操作执行多次,,就需要使用循环结构。  if-else-if语句语法:  if(条件){        当条件为true时,执行大括号内的... 查看详情

flinkonyarn时,如何确定taskmanager数

...)和任务槽(TaskSlot)。与Spark类似地,一个FlinkJob在生成执行计划时也划分成多个Task。Task可以是Source、Sink、算子或算子链(算子链有点意思,之后会另写文章详细说的)。Task可以由多线程并发执行,每个线程处理Task输入数据... 查看详情

cdh6.3.2集成flink的部署配置(代码片段)

...写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运 查看详情

soot生成控制流图

1.将soot.jar文件复制到工程bin目录下;2.在cmd中执行如下命令java-cpsoot-trunck.jarsoot.tools.CFGViewer--soot-classpath.;"%JAVA_HOME%"\jre\lib\rt.jarcom.wauoen.paper.classes.Activity其中,JAVA_HOME是jdk目录;com.wauoen.p 查看详情

java程序猿之流程控制与数组

...码块用花括号括起来,一个代码块通常被当成一个整体来执行(除非遇到return、break、continue等关键字,或者遇到异常)。  if……else语句有一条基本规则:总是优先把范围小的条件放在前面处理。  switch语句由一个... 查看详情

flink运行架构-运行组件介绍

...作业管理器(JobManager)JobManager控制一个应用程序执行的主进程叫JobMaster,提交的每一个应用程序都会被一个不同的JobMaster所控制执行。JobMaster会先接收到要执行的应用程序,这个应用程序会包括:作业图࿰... 查看详情