关键词:
override def receive: PartialFunction[Any, Unit]
case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) }
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
class TaskRunner( execBackend: ExecutorBackend, private val taskDescription: TaskDescription) extends Runnable 下面是run( )方法的主要部分源码。 override def run(): Unit = { threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 var taskStartCpu: Long = 0 startGCTime = computeTotalGcTime() try { // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). //对序列化的数据,进行反序列化 Executor.taskDeserializationProps.set(taskDescription.properties) //通过网络通信的方法,把task运行所需要的文件、资源、jar等拉取过来 updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) //最后,通过正式的反序列化操作,将整个task的数据集拉取过来 //这里用ClassLoader的原因是通过指定的上下文资源,进行加载和读取。(当然,反射还有另外的功能:通过反射放射动态加载一个类,创建类的对象) task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. val killReason = reasonIfKilled if (killReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw new TaskKilledException(killReason.get) } logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime. //计算task开始的时间 taskStart = System.currentTimeMillis() taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true /** * value 对于ShuffleMapTask来说,就是MapStatus * 封装了ShuffleMapTask计算的数据,输出的位置 * 后面的ShuffleMapTask会去联系MapOutputTracker来获取一个ShuffleMapTask的输出位置,通过网络网络拉取数据 * ResultTask也是这样的,只不过是查询ShuffleMapTask的结果MapStatus的位置 * 总的来说 MapOutputTracker(Map输出任务管理器),把map和action联系起来了。 */ val value = try { //真正的task的线程执行方法,下面会详细分析 val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0 && !threwException) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { throw new SparkException(errMsg) } else { logWarning(errMsg) } } if (releasedLocks.nonEmpty && !threwException) { val errMsg = s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" + releasedLocks.mkString("[", ", ", "]") if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) { throw new SparkException(errMsg) } else { logInfo(errMsg) } } } task.context.fetchFailed.foreach { fetchFailure => // uh-oh. it appears the user code has caught the fetch-failure without throwing any // other exceptions. Its *possible* this is what the user meant to do (though highly // unlikely). So we will log an error and keep going. logError(s"TID ${taskId} completed successfully though internally it encountered " + s"unrecoverable fetch failures! Most likely this means user code is incorrectly " + s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure) } //task结束的时间 val taskFinish = System.currentTimeMillis() val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L // If the task has been killed, let's fail it. task.context.killTaskIfInterrupted() //对MapStatus进行各种序列化和封装,后面要发送给MapOutputTracker val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. /** * 计算出task的一些统计信息,运行时间/反序列化消耗的时间/JAva虚拟机 GC消耗的时间/反序列化消耗的时间 */ task.metrics.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) task.metrics.setExecutorDeserializeCpuTime( (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) task.metrics.setExecutorCpuTime( (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit // directSend = sending directly back to the driver //下面是对map结果做序列化和对其做位置等信息的封装,方便网络传输和位置查找。注意,BlockManager 是Spark底层的内存,数据,磁盘数据管理的组件 val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, new ChunkedByteBuffer(serializedDirectResult.duplicate()), StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } //调用executor所在的ScoresGrainedExecutorBackend的statusUpdate,更新状态信息 setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => val reason = task.context.fetchFailed.get.toTaskFailedReason if (!t.isInstanceOf[FetchFailedException]) { // there was a fetch failure in the task, but some user code wrapped that exception // and threw something else. Regardless, we treat it as a fetch failure. val fetchFailedCls = classOf[FetchFailedException].getName logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " + s"failed, but the ${fetchFailedCls} was hidden by another " + s"exception. Spark is handling this like a fetch failure and ignoring the " + s"other exception: $t") } setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { SparkEnv.get.blockManager.registerTask(taskAttemptId) //创建 context ,task的执行上下文,里面记录task执行的全局性的数据 //重试次数,task属于哪个stage,task要处理的是哪个rdd,哪个partition等 context = new TaskContextImpl( stageId, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, metrics) TaskContext.setTaskContext(context) taskThread = Thread.currentThread() if (_reasonIfKilled != null) { kill(interruptThread = false, _reasonIfKilled) } new CallerContext( "TASK", SparkEnv.get.conf.get(APP_CALLER_CONTEXT), appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId), Option(taskAttemptId), Option(attemptNumber)).setCurrentContext() try { //调用抽象方法,runTask runTask(context) } catch { case e: Throwable => // Catch all errors; run task failure callbacks, and rethrow the exception. try { context.markTaskFailed(e) } catch { case t: Throwable => e.addSuppressed(t) } context.markTaskCompleted(Some(e)) throw e } finally { try { // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second // one is no-op. context.markTaskCompleted(None) } finally { try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask( MemoryMode.OFF_HEAP) // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the // future. val memoryManager = SparkEnv.get.memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { // Though we unset the ThreadLocal here, the context member variable itself is still // queried directly in the TaskRunner to check for FetchFailedExceptions. TaskContext.unset() } } } }
def runTask(context: TaskContext): T def preferredLocations: Seq[TaskLocation] = Nil // Map output tracker epoch. Will be set by TaskSetManager. var epoch: Long = -1 // Task context, to be initialized in run(). @transient var context: TaskContextImpl = _ // The actual Thread on which the task is running, if any. Initialized in run(). @volatile @transient private var taskThread: Thread = _ // If non-null, this task has been killed and the reason is as specified. This is used in case // context is not yet initialized when kill() is invoked. @volatile @transient private var _reasonIfKilled: String = null protected var _executorDeserializeTime: Long = 0 protected var _executorDeserializeCpuTime: Long = 0 /** * If defined, this task has been killed and this option contains the reason. */ def reasonIfKilled: Option[String] = Option(_reasonIfKilled) /** * Returns the amount of time spent deserializing the RDD and function to be run. */ def executorDeserializeTime: Long = _executorDeserializeTime def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime /** * Collect the latest values of accumulators used in this task. If the task failed, * filter out the accumulators whose values should not be included on failures. */ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = { if (context != null) { // Note: internal accumulators representing task metrics always count failed values context.taskMetrics.nonZeroInternalAccums() ++ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not // filter them out. context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues) } else { Seq.empty } }
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
spark基础之shuffle原理分析
一概述Shuffle是对数据重新组合和分配Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到... 查看详情
7.spark源码分析(基于yarncluster模式)-task划分提交(代码片段)
本系列基于spark-2.4.6通过上一节的分析,我们最后发现spark通过submitMissingTasks来提交Stage。这个章节我们来分析一下其实现以及Task的划分和提交。privatedefsubmitMissingTasks(stage:Stage,jobId:Int)valpartitionsToCompute:Seq[Int]=stage.findMis 查看详情
8.spark源码分析(基于yarncluster模式)-task执行,map端写入实现(代码片段)
本系列基于spark-2.4.6通过上一节分析,我们知道,task提交之后通过launchTasks来具体执行任务,这一章节我们看下其具体实现.privatedeflaunchTasks(tasks:Seq[Seq[TaskDescription]])for(task<-tasks.flatten)valserializedTask=Ta 查看详情
大数据之spark:sparkcore调优之数据倾斜调优
...调优概述二、数据倾斜发生时的现象三、数据倾斜发生的原理四、如何定位导致数据倾斜的代码五、某个task执行特别慢的情况六、某个task莫名其妙内存溢出的情况七、查看导致数据倾斜的key的数据分布情况数据倾斜的解决方案... 查看详情
spark核心原理之sparkcontext原理
...,这篇文章简要介绍spark1.X版本任务调度的基本部件及其原理,包括SparkContext,SparkEnv,Executor,TaskScheduler,DAGScheduler以及其他部件。本文参考自《深入理解Spark》(by耿嘉安)一书。1 SparkContextSparkDriver用于用户提交任务,SparkDri... 查看详情
spark核心原理之executor原理
Executor是Spark执行任务的进程,Spark启动Executor过程包括如下步骤:1)使用Spark-submit提交到集群,Master收到RequesSubmitDriver请求。2)Master调用scheduler把Driver程序发送到worker端执行。3)Driver执行时初始化SparkContext,创建AppClient,向Mast... 查看详情
spark架构原理介绍以及jobtaskstag概念
Spark运行模式一:Spark运行架构介绍 相关术语概念详解: Application:指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多节点上运行的Executor代码。 Driver:Spark中的Driver就是运行Applic... 查看详情
spark之解决数据倾斜
介绍Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配... 查看详情
spark学习之路sparkcore的调优之数据倾斜调优(代码片段)
...558083目录调优概述数据倾斜发生时的现象数据倾斜发生的原理如何定位导致数据倾斜的代码某个task执行特别慢的情况某个task莫名其妙内存溢出的情况查看导致数据倾斜的key的数据分布情况数据倾斜的解决方案解决方案一:使用H... 查看详情
11.spark源代码分析(基于yarncluster模式)-聊聊stage和task(代码片段)
通过前面的分析,我们了解到,在Spark中只存在两种Stage:ResultStageShuffleMapStageStage之前划分的条件是遍历当前RDD和父RDD的依赖列表,如果遇到了ShuffleDependency则进行Stage的划分,Spark中最后一个Stage永远都是ResultS... 查看详情
深入理解spark-taskscheduler,schedulerbackend源码分析(代码片段)
上次分析了dagshceduler是如何将任务拆分成job,stage,task的,但是拆分后的仅仅是一个逻辑结果,保存为一个resultstage对象,并没执行;而将任务正在执行的是spark的taskscheduler模块和shcedulerbackend模块,taskcheduler模块负责task的... 查看详情
spark技术内幕:task向executor提交的源代码解析
在上文《Spark技术内幕:Stage划分及提交源代码分析》中,我们分析了Stage的生成和提交。可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位... 查看详情
9.spark源代码分析(基于yarncluster模式)-task执行,reduce端读取shuffle数据文件(代码片段)
本系列基于spark-2.4.6通过上一节的分析,我们了解了Spark中ShuflleMapTask中Map端数据的写入流程,这个章节我们分析下Reduce端是如何读取数据的。在ShulleMapTask.runTask中,有这么一个步骤:writer.write(rdd.iterator(partition,cont... 查看详情
spark性能调优之合理设置并行度
Spark性能调优之合理设置并行度1.Spark的并行度指的是什么? spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度! 当分配完所能分配的最大资源了,然后对应资源去调节程序的并... 查看详情
spark2.1.0之源码分析——事件总线
阅读提示:阅读本文前,最好先阅读《Spark2.1.0之源码分析——事件总线》、《Spark2.1.0事件总线分析——ListenerBus的继承体系》及《Spark2.1.0事件总线分析——SparkListenerBus详解》几篇文章的内容。LiveListenerBus继承了SparkListenerBus,... 查看详情
转载spark系列之运行原理和架构
参考 http://www.cnblogs.com/shishanyuan/p/4721326.html 1、 Spark运行架构1.1 术语定义lApplication:SparkApplication的概念和HadoopMapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多... 查看详情
spark之解决数据倾斜
介绍Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配... 查看详情
spark之解决数据倾斜
介绍Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配... 查看详情