sparkcoretaskscheduler源代码与任务提交原理浅析2

yangykaifa yangykaifa     2022-09-09     441

关键词:

引言

上一节《TaskScheduler源代码与任务提交原理浅析1》介绍了TaskScheduler的创建过程,在这一节中,我将承接《Stage生成和Stage源代码浅析》中的submitMissingTasks函数继续介绍task的创建和分发工作。

DAGScheduler中的submitMissingTasks函数

假设一个Stage的全部的parent stage都已经计算完毕或者存在于cache中。那么他会调用submitMissingTasks来提交该Stage所包括的Tasks。
submitMissingTasks负责创建新的Task。


Spark将由Executor运行的Task分为ShuffleMapTask和ResultTask两种。
每一个Stage生成Task的时候依据Stage中的isShuffleMap标记确定是否为ShuffleMapStage,假设标记为真。则这个Stage输出的结果会经过Shuffle阶段作为下一个Stage的输入。创建ShuffleMapTask;否则是ResultStage,这样会创建ResultTask。Stage的结果会输出到Spark空间。最后,Task是通过taskScheduler.submitTasks来提交的。

计算流程

submitMissingTasks的计算流程例如以下:

  1. 首先得到RDD中须要计算的partition,对于Shuffle类型的stage,须要推断stage中是否缓存了该结果;对于Result类型的Final Stage,则推断计算Job中该partition是否已经计算完毕。

  2. 序列化task的binary。Executor能够通过广播变量得到它。每一个task运行的时候首先会反序列化。

    这样在不同的executor上运行的task是隔离的,不会相互影响。

  3. 为每一个须要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage。生成一个ResultTask类型的task。

  4. 确保Task是能够被序列化的。由于不同的cluster有不同的taskScheduler。在这里推断能够简化逻辑;保证TaskSet的task都是能够序列化的。
  5. 通过TaskScheduler提交TaskSet。

部分代码

以下是submitMissingTasks推断是否为ShuffleMapStage的部分代码。其中部分參数说明在凝视中:

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        //stage.id:Stage的序号
        //taskBinary:这个在以下详细介绍
        //part:RDD相应的partition
        //locs:最适合的运行位置
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }
    } else {
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        //p:partition索引,表示从哪个partition读取数据
        //id:输出的分区索引,表示reduceID
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

关于taskBinary參数:这是RDD和ShuffleDependency的广播变量(broadcase version)。作为序列化之后的结果。
这里将RDD和其依赖关系进行序列化。在executor运行task之前再进行反序列化。这样的方式对不同的task之间提供了较好的隔离。

以下是submitMissingTasks进行任务提交的部分代码:

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }

TaskSchedulerImpl中的submitTasks

submitTasks的流程例如以下:

  1. 任务(tasks)会被包装成TaskSetManager(由于TaskSetManager不是线程安全的。所以源代码中须要进行同步)
  2. TaskSetManager实例通过schedulableBuilder(分为FIFOSchedulableBuilder和FairSchedulableBuilder两种)投入调度池中等待调度
  3. 任务提交同一时候启动定时器,假设任务还未被运行。定时器会持续发出警告直到任务被运行
  4. 调用backend的reviveOffers函数。向backend的driverActor实例发送ReviveOffers消息,driveerActor收到ReviveOffers消息后。调用makeOffers处理函数
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

TaskSetManager调度

每一个Stage一经确认,生成相应的TaskSet(即为一组tasks),其相应一个TaskSetManager通过Stage回溯到最源头缺失的Stage提交到调度池pool中。在调度池中,这些TaskSetMananger又会依据Job ID排序。先提交的Job的TaskSetManager优先调度。然后一个Job内的TaskSetManager ID小的先调度,而且假设有未运行完的父母Stage的TaskSetManager。则不会提交到调度池中。

reviveOffers函数代码

以下是CoarseGrainedSchedulerBackend的reviveOffers函数:

  override def reviveOffers() {
    driverActor ! ReviveOffers
  }

driveerActor收到ReviveOffers消息后,调用makeOffers处理函数。

DriverActor的makeOffers函数

makeOffers函数的处理逻辑是:

  1. 找到空暇的Executor,分发的策略是随机分发的,即尽可能将任务平摊到各个Executor
  2. 假设有空暇的Executor。就将任务列表中的部分任务利用launchTasks发送给指定的Executor

SchedulerBackend(这里实际是CoarseGrainedSchedulerBackend)负责将新创建的Task分发给Executor,从launchTasks代码中能够看出。在发送LauchTasks指令之前须要将TaskDescription序列化。

    // Make fake resource offers on all executors
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

TaskSchedulerImpl中的resourceOffers函数

任务是随机分发给各个Executor的,资源分配的工作由resourceOffers函数处理。
正如上面submitTasks函数提到的。在TaskSchedulerImpl中,这一组Task被交给一个新的TaskSetManager实例进行管理。全部的TaskSetManager经由SchedulableBuilder依据特定的调度策略进行排序,在TaskSchedulerImpl的resourceOffers函数中,当前被选择的TaskSetManager的ResourceOffer函数被调用并返回包括了序列化任务数据的TaskDescription。最后这些TaskDescription再由SchedulerBackend派发到ExecutorBackend去运行

resourceOffers主要做了3件事:

  1. 从Workers里面随机抽出一些来运行任务。
  2. 通过TaskSetManager找出和Worker在一起的Task,最后编译打包成TaskDescription返回。
  3. 将Worker–>Array[TaskDescription]的映射关系返回。
  /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    // 遍历worker提供的资源。更新executor相关的映射
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }
    // 从worker其中随机选出一些来,防止任务都堆在一个机器上
    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // getSortedTask函数对taskset进行排序
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer。把本地性最高的Task分给Worker
    // 本地性是依据当前的等待时间来确定的任务本地性的级别。
    // 它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。

    //1. 首先依次遍历 sortedTaskSets, 并对于每一个 Taskset, 遍历 TaskLocality
    //2. 越 local 越优先, 找不到(launchedTask 为 false)才会到下个 locality 级别
    //3. (封装在resourceOfferSingleTaskSet函数)在多次遍历offer list,
    //由于一次taskSet.resourceOffer仅仅会占用一个core, 
    //而不是一次用光全部的 core, 这样有助于一个 taskset 中的 task 比較均匀的分布在workers上
    //4. 仅仅有在该taskset, 该locality下, 对全部worker offer都找不到合适的task时, 
    //才跳到下个 locality 级别
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

TaskDescription代码:

private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task's TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
  private val buffer = new SerializableBuffer(_serializedTask)

  def serializedTask: ByteBuffer = buffer.value

  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}

DriverActor的launchTasks函数

launchTasks函数流程:

  1. launchTasks函数将resourceOffers函数返回的TaskDescription信息进行序列化
  2. 向executorActor发送封装了serializedTask的LaunchTask消息

由于受到Akka Frame Size尺寸的限制。假设发送数据过大,会被截断。

    // Launch tasks returned by a set of resource offers
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSet.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

參考资料

Spark大数据处理,高彦杰著,机械工业出版社
Spark技术内幕: Task向Executor提交的源代码解析
Spark源代码系列(三)作业运行过程

转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页

seay源代码审计系统使用

Seay源代码审计系统简介Seay源代码审计系统使用如何使用“Seay源代码审计系统”扫描源代码漏洞Seay源代码审计系统下载安装github-Seay源代码审计系统 查看详情

源代码文件

函数定义,全局声明e和预处理命令组成了c程序源代码。对于小程序而言,源代码写在单个源代码文件中。较大的c程序可能包含多个源代码文件。因为函数定义通常依赖预处理和全局声明,所以源代码文件通常具有:(1)预处... 查看详情

源代码

相约西安 《大话数据结构》源代码《大话设计模式》源代码         ——程杰 查看详情

政务审计软件源代码怎么查

政务审计软件的源代码可以通过以下几种方法进行查找。一、使用搜索引擎。在网上使用搜索引擎,如Google、Bing等,可以搜索政务审计软件的源代码。搜索结果中会显示软件的源代码以及相关介绍,可以根据自己的需求进行选... 查看详情

如何从github获取源代码

如何从GitHub获取源代码github是当前流行的开源项目托管网站,里面有成千上万的项目值得学习和借鉴,可以把项目源代码下载到本地研究。本文介绍如何获取github的源代码。方法1-克隆(Clone)源代码到本地克隆之后会把源代码下载... 查看详情

在区分源代码,对象代码,汇编代码和机器代码时,我有一个困惑(代码片段)

我阅读了编写源代码的每个地方(高级语言),编译器将其转换为机器代码(低级语言)。然后我读到有一个汇编程序,它将汇编代码转换为机器代码。然后在区分编译器和解释器时,我读到编译器首先将整个代码转换为目标代... 查看详情

springboot源代码阅读系列之--源代码编译构建(代码片段)

工作多年一直没有潜心学习Spring的源代码,趁着该次疫情,准备学习下springboot源代码,本文是spring源代码阅读的初始篇-编译,目的是能成功构建spring源代码;1.代码下载笔者在GitHub-spring-projects/spring-boot:SpringBo... 查看详情

springboot源代码阅读系列之--源代码编译构建(代码片段)

工作多年一直没有潜心学习Spring的源代码,趁着该次疫情,准备学习下springboot源代码,本文是spring源代码阅读的初始篇-编译,目的是能成功构建spring源代码;1.代码下载笔者在GitHub-spring-projects/spring-boot:SpringBo... 查看详情

网页中设置禁止查看源代码(保护源代码)(代码片段)

开发网站的过程中有时我们不想让客户看到页面的源代码,甚至页面上的文字内容都不想被复制,下面我们来看一下怎么保护页面内容禁止查看页面源代码和禁止复制页面中的文字&lt;bodystyle="oncontextmenu="returnfalse"onselectstart="re... 查看详情

什么是开源

开源:是开放的源代码软件包括:源代码和目标代码源代码:是开放的源代码,任何人都可以拿来用目标代码开源是软件行业的一个用语开源,就是开放源代码软件包括源代码,和目标代码开放了源代码后,别人就可以非常简单... 查看详情

springboot源代码阅读系列之--源代码编译构建

工作多年一直没有潜心学习Spring的源代码,趁着该次疫情,准备学习下springboot源代码,本文是spring源代码阅读的初始篇-编译,目的是能成功构建spring源代码;1.代码下载笔者在GitHub-spring-projects/spring-boot:SpringBo... 查看详情

NFS 源代码

】NFS源代码【英文标题】:NFSsourcecode【发布时间】:2014-09-2919:00:16【问题描述】:我正在寻找NFS客户端和NFS服务器的源代码,以便在代码内部进行更改,但不幸的是我无法找到源代码。谁能帮忙找到他们的源代码?它们是位于li... 查看详情

如何从github获取源代码

...站,里面有成千上万的项目值得学习和借鉴,可以把项目源代码下载到本地研究。本文介绍获取github的源代码的方法:方法1-克隆(Clone)源代码到本地克隆之后会把源代码下载到本地,创建一个本地的代码库,可以任意在本地修改... 查看详情

如何从github获取源代码

...站,里面有成千上万的项目值得学习和借鉴,可以把项目源代码下载到本地研究。本文介绍如何获取github的源代码。方法1-克隆(Clone)源代码到本地克隆之后会把源代码下载到本地,创建一个本地的代码库,可以任意在本地修改代... 查看详情

查看源代码:显示 JSON 响应而不是 HTML 源代码

】查看源代码:显示JSON响应而不是HTML源代码【英文标题】:ViewSource:JSONResponseisshowinginsteadofHTMLsourcecode【发布时间】:2019-02-2407:22:39【问题描述】:因此,当我查看我的页面源代码时,将显示json响应(也就是我正在检索的表)... 查看详情

activemq从源代码构建

...过来用之而后快。只是我们也应该知道这些项目是怎样从源代码构建而来的。既然代码是写出来的,就不能避免有BUG存在,话说没有完美的软件,也没有无漏洞的程序。事实上从源代码构建。步骤不多,总的来说是件非常easy的... 查看详情

api的源代码

】api的源代码【英文标题】:Sourcecodeoftheapi【发布时间】:2016-02-2608:47:30【问题描述】:显然问题不清楚。我不是在寻找产品的源代码。我正在寻找api的来源。这些源代码在MobileFirstPlatformStudioforEclipse中提供。那么是否有一个公... 查看详情

ffmpeg源代码结构图-解码

...)[+]=====================================================FFmpeg的库函数源代码分析文章列表: 【架构图】FFmpeg源代码结构图-解码FFmpeg源代码结构图-编码 【通用】FFmpeg源代码简单分析:av_register_all()FFmpeg源代码简单 查看详情