大数据之spark:spark底层执行原理(代码片段)

浊酒南街 浊酒南街     2022-12-04     305

关键词:

目录

1、Spark 运行流程


具体运行流程如下:

1、SparkContext 向资源管理器注册并向资源管理器申请运行 Executor

2、资源管理器分配 Executor,然后资源管理器启动 Executor

3、Executor 发送心跳至资源管理器

4、SparkContext 构建 DAG 有向无环图

5、将 DAG 分解成 Stage(TaskSet)

6、把 Stage 发送给 TaskScheduler

7、Executor 向 SparkContext 申请 Task

8、TaskScheduler 将 Task 发送给 Executor 运行

9、同时 SparkContext 将应用程序代码发放给 Executor

10、Task 在 Executor 上运行,运行完毕释放所有资源

1. 从代码角度看 DAG 图的构建

Val lines1 = sc.textFile(inputPath1).map(...).map(...)

Val lines2 = sc.textFile(inputPath2).map(...)

Val lines3 = sc.textFile(inputPath3)

Val dtinone1 = lines2.union(lines3)

Val dtinone = lines1.join(dtinone1)

dtinone.saveAsTextFile(...)

dtinone.filter(...).foreach(...)

上述代码的 DAG 图如下所示:

Spark 内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是如上图所示的 DAG。

Spark 的计算发生在 RDD 的 Action 操作,而对 Action 之前的所有 Transformation,Spark 只是记录下 RDD 生成的轨迹,而不会触发真正的计算。

2. 将 DAG 划分为 Stage 核心算法

一个 Application 可以有多个 job 多个 Stage:

Spark Application 中可以因为不同的 Action 触发众多的 job,一个 Application 中可以有很多的 job,每个 job 是由一个或者多个 Stage 构成的,后面的 Stage 依赖于前面的 Stage,也就是说只有前面依赖的 Stage 计算完毕后,后面的 Stage 才会运行。

划分依据:

Stage 划分的依据就是宽依赖,像 reduceByKey,groupByKey 等算子,会导致宽依赖的产生。
回顾下宽窄依赖的划分原则:
窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖。即一对一或者多对一的关系,可理解为独生子女。常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父 RDD 是 hash-partitioned)等。
宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。即一对多的关系,可理解为超生。常见的宽依赖有 groupByKey、partitionBy、reduceByKey、join(父 RDD 不是 hash-partitioned)等。
核心算法:回溯算法

从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。

Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。

3. 将 DAG 划分为 Stage 剖析


一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG)。

一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分)。

同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task)。

可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage。

同时我们可以注意到,在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。

4. 提交 Stages

调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler 通过 TaskScheduler 接口提交任务集,这个任务集最终会触发 TaskScheduler 构建一个 TaskSetManager 的实例来管理这个任务集的生命周期,对于 DAGScheduler 来说,提交调度阶段的工作到此就完成了。

而 TaskScheduler 的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager 调度具体的任务到对应的 Executor 节点上进行运算。

5. 监控 Job、Task、Executor
1、DAGScheduler 监控 Job 与 Task:
要保证相互依赖的作业调度阶段能够得到顺利的调度执行,DAGScheduler 需要监控当前作业调度阶段乃至任务的完成情况。

这通过对外暴露一系列的回调函数来实现的,对于 TaskScheduler 来说,这些回调函数主要包括任务的开始结束失败、任务集的失败,DAGScheduler 根据这些任务的生命周期信息进一步维护作业和调度阶段的状态信息。

2、DAGScheduler 监控 Executor 的生命状态:
TaskScheduler 通过回调函数通知 DAGScheduler 具体的 Executor 的生命状态,如果某一个 Executor 崩溃了,则对应的调度阶段任务集的 ShuffleMapTask 的输出结果也将标志为不可用,这将导致对应任务集状态的变更,进而重新执行相关计算任务,以获取丢失的相关数据。

6. 获取任务执行结果

1、结果 DAGScheduler:
一个具体的任务在 Executor 中执行完毕后,其结果需要以某种形式返回给 DAGScheduler,根据任务类型的不同,任务结果的返回方式也不同。

2、两种结果,中间结果与最终结果:
对于 FinalStage 所对应的任务,返回给 DAGScheduler 的是运算结果本身。

而对于中间调度阶段对应的任务 ShuffleMapTask,返回给 DAGScheduler 的是一个 MapStatus 里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个调度阶段的任务获取输入数据的依据。

3、两种类型,DirectTaskResult 与 IndirectTaskResult:
根据任务结果大小的不同,ResultTask 返回的结果又分为两类:

如果结果足够小,则直接放在 DirectTaskResult 对象内中。

如果超过特定尺寸则在 Executor 端会将 DirectTaskResult 先序列化,再把序列化的结果作为一个数据块存放在 BlockManager 中,然后将 BlockManager 返回的 BlockID 放在 IndirectTaskResult 对象中返回给 TaskScheduler,TaskScheduler 进而调用 TaskResultGetter 将 IndirectTaskResult 中的 BlockID 取出并通过 BlockManager 最终取得对应的 DirectTaskResult。

7. 任务调度总体诠释

一张图说明任务总体调度:

2、Spark 运行架构特点

1. Executor 进程专属

每个 Application 获取专属的 Executor 进程,该进程在 Application 期间一直驻留,并以多线程方式运行 Tasks。

Spark Application 不能跨应用程序共享数据,除非将数据写入到外部存储系统。如图所示:

2. 支持多种资源管理器

Spark 与资源管理器无关,只要能够获取 Executor 进程,并能保持相互通信就可以了。

Spark 支持资源管理器包含:Standalone、On Mesos、On YARN、Or On EC2。如图所示:

3. Job 提交就近原则

提交 SparkContext 的 Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同一个 Rack(机架)里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换;

如果想在远程集群中运行,最好使用 RPC 将 SparkContext 提交给集群,不要远离 Worker 运行 SparkContext。
如图所示:

4. 移动程序而非移动数据的原则执行

移动程序而非移动数据的原则执行,Task 采用了数据本地性和推测执行的优化机制。

关键方法:taskIdToLocations、getPreferedLocations。
如图所示:

大数据技术之spark——spark运行模式(local+standalone+yarn)(代码片段)

文章目录前言一、Spark运行环境1.1、spark运行环境概述1.2、local模式1.2.1、上传spark压缩文件并解压1.2.2、启动local环境1.2.3、命令行工具(wordcount小测试)1.2.4、提交应用1.3、Standalone(独立部署模式)1.3.1、解压缩文件1.3.2、修... 查看详情

大数据之spark:spark数据倾斜

目录1.预聚合原始数据1.避免shuffle过程2.增大key粒度(减小数据倾斜可能性,增大每个task的数据量)2.预处理导致倾斜的key1.过滤2.使用随机key3.sample采样对倾斜key单独进行join3.提高reduce并行度1.reduce端并行度的设置2.redu... 查看详情

大数据之spark:spark的两种核心shuffle

目录二、SortShuffle解析1.普通运行机制2.bypass运行机制3.TungstenSortShuffle运行机制二、SortShuffle解析SortShuffleManager的运行机制主要分成三种:1、普通运行机制;2、bypass运行机制,当shufflereadtask的数量小于等于spark.shuffle.sor... 查看详情

大数据之spark:spark调优之shuffle调优

...大小在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下࿰ 查看详情

大数据之spark:spark的两种核心shuffle

...是连接Map与Reduce之间的桥梁,Map阶段通过Shuffle过程将数据输出到Reduce阶段中。由于Shuffle涉及磁盘的读写和网络I/O,因此Shuffl 查看详情

大数据之spark:spark调优之rdd算子调优

目录7.repartition/coalesce调节并行度8.reduceByKey本地预聚合9.使用持久化+checkpoint10.使用广播变量7.repartition/coalesce调节并行度我们知道Spark中有并行度的调节策略,但是,并行度的设置对于SparkSQL是不生效的,用户设置的... 查看详情

大数据之spark:spark调优之rdd算子调优

目录1.RDD复用2.尽早filter3.读取大量小文件-用wholeTextFiles4.mapPartition和foreachPartition1、mapPartitions2、foreachPartition5.filter+coalesce/repartition(减少分区)6.并行度设置1.RDD复用在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD... 查看详情

大数据之spark:spark基础

...he的顶级项目;Spark成功构建起了一体化、多元化的大数据处理体系。在任何规模的数据计算中,Spark在性能和扩展性上都更具优势;在FullStack理想的指引下,Spark中的SparkSQL、SparkStreaming、MLLib、GraphX、R五大子框架... 查看详情

大数据之spark:spark面试(初级)

目录1:介绍一下Spark2:谈一谈Spark的生态体系3:说说Spark的工作流程4:Spark运行模式有哪些?说说你最熟悉的一种5:谈谈YarnCluster和YarnClient模式的区别6:简单讲下RDD的特性7:RDD的宽依赖和窄依赖... 查看详情

ios之深入解析类方法+load与+initialize的底层原理

...门面向对象语言,有类和对象的概念。编译后,类相关的数据结构会保留在目标文件中,在运行时得到解析和使用。在应用程序运行起来的时候,类的信息会有加载和初始化过程。App启动到执行main函数之前,程序就执行了很多... 查看详情

spark从入门到精通

什么是Spark大数据计算框架离线批处理大数据体系架构图(Spark)Spark包含了大数据领域常见的各种计算框架:比如SparkCore用于离线计算,SparkSQL用于交互式查询,SparkStreaming用于实时流式计算,SparkMLib用于机器学习,SparkGraphX用于... 查看详情

面试系列五之项目涉及技术spark(代码片段)

一、Spark1.1Spark有几种部署方式?请分别简要论述1)Local:运行在一台机器上,通常是练手或者测试环境。2)Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。3)Yarn:Spark客户... 查看详情

orm之sqlarchemy底层实现原理(代码片段)

...对象关系映射】sqlarchemy底层是如何实现的?当我们需要对数据库进行操作时,是否可以依如下方式操作数据库--不要写sql语句,我要写python代码创建表  --------  创建类行数据  --------  对象操作行数据&... 查看详情

学习笔记spark——spark入门

...展、容错的集群计算框架;Spark是基于内存计算的大数据分布式计算框架;低延迟的复杂分析;Spark是HadoopMapReduce的替代方案。二、Spark的发展历史对于一个具有相当技术门槛与复杂度的平台,Spark从诞生到正式版... 查看详情

sparksql底层执行流程详解(代码片段)

...四、Catalyst的两大优化一、ApacheSparkApacheSpark是用于大规模数据处理的统一分析引擎,基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件... 查看详情

java同步阻塞队列之delayqueue实现原理,priorityqueue原理(代码片段)

...在指定的时间之后执行。其底层采用PriorityQueue作为底层数据结构。在讲解DelayQueue之前,我们需要先讲解一下PriorityQueue。PriorityQueue是一个优先级队列,在底层使用了一个可动态扩容的数组作为基础数据结构,实现了... 查看详情

spark能执行udf不能执行udaf,啥原因

...内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的mapreduce的算法。其架构如下图所示:Spark与Hadoop的对比Spark的中间数据放到内存中,对于迭代运算效率更高。Spark更适合于迭代运算比较多... 查看详情

spark能执行udf不能执行udaf,啥原因

...于MapReduceJob间输结保存内存再需要读写HDFSSpark能更适用于数据挖掘与机器习等需要迭代mapreduce算其架构图所示:图"class="ikqb_img_alink">Spark与Hadoop比Spark间数据放内存于迭代运算效率更高Spark更适合于迭代运算比较MLDM... 查看详情