spark之task原理分析

那一抹风 那一抹风     2022-10-10     400

关键词:

在Spark中,一个应用程序要想被执行,肯定要经过以下的步骤:
    
    从这个路线得知,最终一个job是依赖于分布在集群不同节点中的task,通过并行或者并发的运行来完成真正的工作。由此可见,一个个的分布式的task才是Spark的真正执行者。下面先来张task运行框架整体的对Spark的task运行有个大概的了解。
    task运行之前的工作是Driver启动Executor,接着Executor准备好一切运行环境,并向Driver反向注册,最终Driver向Executor发送LunchTask事件消息,从Executor接受到LanchTask那一刻起,task就一发不可收拾了,开始通过java线程来进行以后的工作。当然了,在task正式工作之前,还有一些工作,比如根据stage算法划分好stage,根据task最佳位置计算算法寻找到task的最佳位置(第一期盼都是希望能够在同一个节点的同一个进程中有task所需要的需要,第二才是同一节点的不同进程,第三才是同一机架的不同节点,第四才是不同机架)。这样做的目的是减少网络通信的开销,节省CPU资源,提高系统性能。
 
    其实虽然图片看起来复杂,其实task所做的事情无非以下几点:
1.通过网络拉取运行所需的资源,并反序列化(由于多个task运行在多个Executor中,都是并行运行的,或者并发运行的,一个stage的task,处理的RDD是一样的,这是通过广播变量来完成的)
2.获取shuffleManager,从shuffleManager中获取shuffleWriter(shuffleWriter用于后面的数据处理并把返回的数据结果写入磁盘)
3.调用rdd.iterator(),并传入当前task要处理的partition(针对RDD的某个partition执行自定义的算子或逻辑函数,返回的数据都是通过上面生成的ShuffleWriter,经过HashPartitioner[默认是这个]分区之后写入对应的分区backet,其实就是写入磁盘文件中)
4.封装数据结果为MapStatus ,发送给MapOutputTracker,供ResultTask拉取。(MapStatus里面封装了ShuffleMaptask计算后的数据和存储位置地址等数据信息。其实也就是BlockManager相关信息,BlockManager 是Spark底层的内存,数据,磁盘数据管理的组件)
5.ResultTask拉取ShuffleMapTask的结果数据(经过2/3/4步骤之后的结果)
 
    实现这个过程,task有ShuffleMapTask和ResultTask两个子类task来支撑,前者是用于通过各种map算子和自定义函数转换RDD。后者主要是触发了action操作,把map阶段后的新的RDD拉取过去,再执行我们自定义的函数体,实现各种业务功能。
 
下面通过源码来分析整个流程:
CoarseGrainedExecutorBackend是executor粗粒度真正的后台处理进程。其中比较重要的是以下函数,主要是用于接受其他工作进程所发送的事件消息,并做对应的响应。
override def receive: PartialFunction[Any, Unit]
 
    executor接受到这个事件消息后,task才真正开始工作。其中的executor.launchTask(this, taskDesc)就是主要的实现函数体
case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskDesc)
      }
 
    launchTask方法,主要是new出一个TaskRunner线程,并把它放进java的线程池中运行。通过这里也知道其实Spark的底层是依赖Java和Scala共同实现的。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
  }
 
    通过看TaskRunner的实现,知道它是继承Runnable的,因此,就知道线程真正的运行体是run()方法。
class TaskRunner(
      execBackend: ExecutorBackend,
      private val taskDescription: TaskDescription)
    extends Runnable
 
    下面是run( )方法的主要部分源码。
override def run(): Unit = {
      threadId = Thread.currentThread.getId
      Thread.currentThread.setName(threadName)
      val threadMXBean = ManagementFactory.getThreadMXBean
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
      val deserializeStartTime = System.currentTimeMillis()
      val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
      } else 0L
      Thread.currentThread.setContextClassLoader(replClassLoader)
      val ser = env.closureSerializer.newInstance()
      logInfo(s"Running $taskName (TID $taskId)")
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var taskStart: Long = 0
      var taskStartCpu: Long = 0
      startGCTime = computeTotalGcTime()
      try {
        // Must be set before updateDependencies() is called, in case fetching dependencies
        // requires access to properties contained within (e.g. for access control).
        //对序列化的数据,进行反序列化
        Executor.taskDeserializationProps.set(taskDescription.properties)
        //通过网络通信的方法,把task运行所需要的文件、资源、jar等拉取过来
        updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
        //最后,通过正式的反序列化操作,将整个task的数据集拉取过来
        //这里用ClassLoader的原因是通过指定的上下文资源,进行加载和读取。(当然,反射还有另外的功能:通过反射放射动态加载一个类,创建类的对象)
        task = ser.deserialize[Task[Any]](
          taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
        task.localProperties = taskDescription.properties
        task.setTaskMemoryManager(taskMemoryManager)
        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        val killReason = reasonIfKilled
        if (killReason.isDefined) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException(killReason.get)
        }
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)
        // Run the actual task and measure its runtime.
        //计算task开始的时间
        taskStart = System.currentTimeMillis()
        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        var threwException = true
        /**
          * value 对于ShuffleMapTask来说,就是MapStatus
          * 封装了ShuffleMapTask计算的数据,输出的位置
          * 后面的ShuffleMapTask会去联系MapOutputTracker来获取一个ShuffleMapTask的输出位置,通过网络网络拉取数据
          * ResultTask也是这样的,只不过是查询ShuffleMapTask的结果MapStatus的位置
                    *  总的来说 MapOutputTracker(Map输出任务管理器),把map和action联系起来了。
          */
        val value = try {
            //真正的task的线程执行方法,下面会详细分析
          val res = task.run( 
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        } finally {
          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
          if (freedMemory > 0 && !threwException) {
            val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
              throw new SparkException(errMsg)
            } else {
              logWarning(errMsg)
            }
          }
          if (releasedLocks.nonEmpty && !threwException) {
            val errMsg =
              s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
                releasedLocks.mkString("[", ", ", "]")
            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {
              throw new SparkException(errMsg)
            } else {
              logInfo(errMsg)
            }
          }
        }
        task.context.fetchFailed.foreach { fetchFailure =>
          // uh-oh.  it appears the user code has caught the fetch-failure without throwing any
          // other exceptions.  Its *possible* this is what the user meant to do (though highly
          // unlikely).  So we will log an error and keep going.
          logError(s"TID ${taskId} completed successfully though internally it encountered " +
            s"unrecoverable fetch failures!  Most likely this means user code is incorrectly " +
            s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
        }
        //task结束的时间
        val taskFinish = System.currentTimeMillis()
        val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        // If the task has been killed, let's fail it.
        task.context.killTaskIfInterrupted()
        //对MapStatus进行各种序列化和封装,后面要发送给MapOutputTracker
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()
        // Deserialization happens in two parts: first, we deserialize a Task object, which
        // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
        /**
          * 计算出task的一些统计信息,运行时间/反序列化消耗的时间/JAva虚拟机 GC消耗的时间/反序列化消耗的时间
          */
        task.metrics.setExecutorDeserializeTime(
          (taskStart - deserializeStartTime) + task.executorDeserializeTime)
        task.metrics.setExecutorDeserializeCpuTime(
          (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
        // We need to subtract Task.run()'s deserialization time to avoid double-counting
        task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
        task.metrics.setExecutorCpuTime(
          (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
        task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
        task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
        // Note: accumulator updates must be collected after TaskMetrics is updated
        val accumUpdates = task.collectAccumulatorUpdates()
        // TODO: do not serialize value twice
        val directResult = new DirectTaskResult(valueBytes, accumUpdates)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit
        // directSend = sending directly back to the driver
        //下面是对map结果做序列化和对其做位置等信息的封装,方便网络传输和位置查找。注意,BlockManager 是Spark底层的内存,数据,磁盘数据管理的组件
        val serializedResult: ByteBuffer = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize > maxDirectResultSize) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId,
              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
              StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
        //调用executor所在的ScoresGrainedExecutorBackend的statusUpdate,更新状态信息
        setTaskFinishedAndClearInterruptStatus()
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      } catch {
        case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
          val reason = task.context.fetchFailed.get.toTaskFailedReason
          if (!t.isInstanceOf[FetchFailedException]) {
            // there was a fetch failure in the task, but some user code wrapped that exception
            // and threw something else.  Regardless, we treat it as a fetch failure.
            val fetchFailedCls = classOf[FetchFailedException].getName
            logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " +
              s"failed, but the ${fetchFailedCls} was hidden by another " +
              s"exception.  Spark is handling this like a fetch failure and ignoring the " +
              s"other exception: $t")
          }
          setTaskFinishedAndClearInterruptStatus()
          execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
    executor的task.run,底层主要是task的run方法,很明显看出来,主要工作是创建一个context,把task运行过程中的上下文记录下来。其中关键的是调用抽象方法,runTask。
final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem): T = {
  SparkEnv.get.blockManager.registerTask(taskAttemptId)
  //创建 context ,task的执行上下文,里面记录task执行的全局性的数据
  //重试次数,task属于哪个stage,task要处理的是哪个rdd,哪个partition等
  context = new TaskContextImpl(
    stageId,
    partitionId,
    taskAttemptId,
    attemptNumber,
    taskMemoryManager,
    localProperties,
    metricsSystem,
    metrics)
  TaskContext.setTaskContext(context)
  taskThread = Thread.currentThread()
 
  if (_reasonIfKilled != null) {
    kill(interruptThread = false, _reasonIfKilled)
  }
 
  new CallerContext(
    "TASK",
    SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
    appId,
    appAttemptId,
    jobId,
    Option(stageId),
    Option(stageAttemptId),
    Option(taskAttemptId),
    Option(attemptNumber)).setCurrentContext()
 
  try {
    //调用抽象方法,runTask
    runTask(context)
  } catch {
    case e: Throwable =>
      // Catch all errors; run task failure callbacks, and rethrow the exception.
      try {
        context.markTaskFailed(e)
      } catch {
        case t: Throwable =>
          e.addSuppressed(t)
      }
      context.markTaskCompleted(Some(e))
      throw e
  } finally {
    try {
      // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
      // one is no-op.
      context.markTaskCompleted(None)
    } finally {
      try {
        Utils.tryLogNonFatalError {
          // Release memory used by this thread for unrolling blocks
          SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
          SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
            MemoryMode.OFF_HEAP)
          // Notify any tasks waiting for execution memory to be freed to wake up and try to
          // acquire memory again. This makes impossible the scenario where a task sleeps forever
          // because there are no other tasks left to notify it. Since this is safe to do but may
          // not be strictly necessary, we should revisit whether we can remove this in the
          // future.
          val memoryManager = SparkEnv.get.memoryManager
          memoryManager.synchronized { memoryManager.notifyAll() }
        }
      } finally {
        // Though we unset the ThreadLocal here, the context member variable itself is still
        // queried directly in the TaskRunner to check for FetchFailedExceptions.
        TaskContext.unset()
      }
    }
  }
}
 
    task是抽象方法,意味着这个类只是模板类,仅仅封装了一些子类通用的属性和方法,依赖于子类实现它们,来确定具体的功能。 前面说过task的有两个子类ShuffleMapTask和ResultTask。有了它们,才能运行定义的算子和逻辑
def runTask(context: TaskContext): T
 
def preferredLocations: Seq[TaskLocation] = Nil
 
// Map output tracker epoch. Will be set by TaskSetManager.
var epoch: Long = -1
 
// Task context, to be initialized in run().
@transient var context: TaskContextImpl = _
 
// The actual Thread on which the task is running, if any. Initialized in run().
@volatile @transient private var taskThread: Thread = _
 
// If non-null, this task has been killed and the reason is as specified. This is used in case
// context is not yet initialized when kill() is invoked.
@volatile @transient private var _reasonIfKilled: String = null
 
protected var _executorDeserializeTime: Long = 0
protected var _executorDeserializeCpuTime: Long = 0
 
/**
* If defined, this task has been killed and this option contains the reason.
*/
def reasonIfKilled: Option[String] = Option(_reasonIfKilled)
 
/**
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime
def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime
 
/**
* Collect the latest values of accumulators used in this task. If the task failed,
* filter out the accumulators whose values should not be included on failures.
*/
def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = {
  if (context != null) {
    // Note: internal accumulators representing task metrics always count failed values
    context.taskMetrics.nonZeroInternalAccums() ++
      // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
      // filter them out.
      context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
  } else {
    Seq.empty
  }
}
 
    到此,task整个运行流程已分析一遍,最后,调用下面的函数来更新状态信息  
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
 
    最后来总结一下,task的运行一开始不是直接调用底层的task的run方法直接处理job-->stage-->taskSet-->task这条路线的task任务的,它是通过分层和分工的思想来完成。task会派生出两个子类ShuffleMapTask和ResultTask分别完成对应的工作,ShuffleMapTask主要是对task所拥有的的RDD的partition做对应的RDD转换工作,ResultTask主要是根据action动作触发,并拉取ShuffleMapTask阶段的结果做进一步的算子和逻辑函数对数据对真正进一步的处理。这两个阶段是通过MapOutputTracker来连接起来的。

spark基础之shuffle原理分析

一概述Shuffle是对数据重新组合和分配Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到... 查看详情

7.spark源码分析(基于yarncluster模式)-task划分提交(代码片段)

本系列基于spark-2.4.6通过上一节的分析,我们最后发现spark通过submitMissingTasks来提交Stage。这个章节我们来分析一下其实现以及Task的划分和提交。privatedefsubmitMissingTasks(stage:Stage,jobId:Int)valpartitionsToCompute:Seq[Int]=stage.findMis 查看详情

8.spark源码分析(基于yarncluster模式)-task执行,map端写入实现(代码片段)

本系列基于spark-2.4.6通过上一节分析,我们知道,task提交之后通过launchTasks来具体执行任务,这一章节我们看下其具体实现.privatedeflaunchTasks(tasks:Seq[Seq[TaskDescription]])for(task<-tasks.flatten)valserializedTask=Ta 查看详情

大数据之spark:sparkcore调优之数据倾斜调优

...调优概述二、数据倾斜发生时的现象三、数据倾斜发生的原理四、如何定位导致数据倾斜的代码五、某个task执行特别慢的情况六、某个task莫名其妙内存溢出的情况七、查看导致数据倾斜的key的数据分布情况数据倾斜的解决方案... 查看详情

spark核心原理之sparkcontext原理

...,这篇文章简要介绍spark1.X版本任务调度的基本部件及其原理,包括SparkContext,SparkEnv,Executor,TaskScheduler,DAGScheduler以及其他部件。本文参考自《深入理解Spark》(by耿嘉安)一书。1 SparkContextSparkDriver用于用户提交任务,SparkDri... 查看详情

spark核心原理之executor原理

Executor是Spark执行任务的进程,Spark启动Executor过程包括如下步骤:1)使用Spark-submit提交到集群,Master收到RequesSubmitDriver请求。2)Master调用scheduler把Driver程序发送到worker端执行。3)Driver执行时初始化SparkContext,创建AppClient,向Mast... 查看详情

spark架构原理介绍以及jobtaskstag概念

Spark运行模式一:Spark运行架构介绍  相关术语概念详解:    Application:指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多节点上运行的Executor代码。    Driver:Spark中的Driver就是运行Applic... 查看详情

spark之解决数据倾斜

介绍Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配... 查看详情

spark学习之路sparkcore的调优之数据倾斜调优(代码片段)

...558083目录调优概述数据倾斜发生时的现象数据倾斜发生的原理如何定位导致数据倾斜的代码某个task执行特别慢的情况某个task莫名其妙内存溢出的情况查看导致数据倾斜的key的数据分布情况数据倾斜的解决方案解决方案一:使用H... 查看详情

11.spark源代码分析(基于yarncluster模式)-聊聊stage和task(代码片段)

通过前面的分析,我们了解到,在Spark中只存在两种Stage:ResultStageShuffleMapStageStage之前划分的条件是遍历当前RDD和父RDD的依赖列表,如果遇到了ShuffleDependency则进行Stage的划分,Spark中最后一个Stage永远都是ResultS... 查看详情

深入理解spark-taskscheduler,schedulerbackend源码分析(代码片段)

 上次分析了dagshceduler是如何将任务拆分成job,stage,task的,但是拆分后的仅仅是一个逻辑结果,保存为一个resultstage对象,并没执行;而将任务正在执行的是spark的taskscheduler模块和shcedulerbackend模块,taskcheduler模块负责task的... 查看详情

spark技术内幕:task向executor提交的源代码解析

在上文《Spark技术内幕:Stage划分及提交源代码分析》中,我们分析了Stage的生成和提交。可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位... 查看详情

9.spark源代码分析(基于yarncluster模式)-task执行,reduce端读取shuffle数据文件(代码片段)

本系列基于spark-2.4.6通过上一节的分析,我们了解了Spark中ShuflleMapTask中Map端数据的写入流程,这个章节我们分析下Reduce端是如何读取数据的。在ShulleMapTask.runTask中,有这么一个步骤:writer.write(rdd.iterator(partition,cont... 查看详情

spark性能调优之合理设置并行度

Spark性能调优之合理设置并行度1.Spark的并行度指的是什么?   spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度!  当分配完所能分配的最大资源了,然后对应资源去调节程序的并... 查看详情

spark2.1.0之源码分析——事件总线

阅读提示:阅读本文前,最好先阅读《Spark2.1.0之源码分析——事件总线》、《Spark2.1.0事件总线分析——ListenerBus的继承体系》及《Spark2.1.0事件总线分析——SparkListenerBus详解》几篇文章的内容。LiveListenerBus继承了SparkListenerBus,... 查看详情

转载spark系列之运行原理和架构

参考 http://www.cnblogs.com/shishanyuan/p/4721326.html  1、 Spark运行架构1.1 术语定义lApplication:SparkApplication的概念和HadoopMapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多... 查看详情

spark之解决数据倾斜

介绍Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配... 查看详情

spark之解决数据倾斜

介绍Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配... 查看详情