spark源码剖析——sparkcontext(代码片段)

Icedzzz Icedzzz     2022-11-24     241

关键词:

文章目录


SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前, 首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。但SparkContext中最重要的两个功能就是: DAGScheduler和TaskScheduler

初始化SparkContext,创建DAG/TaskScheduler

SparkContext成员变量: SparkConf、日志目录、SparkEnv(保存运行中的Spark实例,包括序列化起、RPCEnv、块管理器等,Spark可以通过一个全局变量找到SparkEnv,所有的线程都可以访问同一个SparkEnv)、SparkUI、心跳线程、DAG/TaskScheduler等

 private var _conf: SparkConf = _
  private var _eventLogDir: Option[URI] = None
  private var _eventLogCodec: Option[String] = None
  private var _listenerBus: LiveListenerBus = _
  private var _env: SparkEnv = _
  private var _statusTracker: SparkStatusTracker = _
  private var _progressBar: Option[ConsoleProgressBar] = None
  private var _ui: Option[SparkUI] = None
  private var _hadoopConfiguration: Configuration = _
  private var _executorMemory: Int = _
  private var _schedulerBackend: SchedulerBackend = _
  private var _taskScheduler: TaskScheduler = _
  private var _heartbeatReceiver: RpcEndpointRef = _
  @volatile private var _dagScheduler: DAGScheduler = _
  private var _applicationId: String = _
  private var _applicationAttemptId: Option[String] = None
  private var _eventLogger: Option[EventLoggingListener] = None
  private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
  private var _cleaner: Option[ContextCleaner] = None
  private var _listenerBusStarted: Boolean = false
  private var _jars: Seq[String] = _
  private var _files: Seq[String] = _
  private var _shutdownHookRef: AnyRef = _
  private var _statusStore: AppStatusStore = _

初始化上下文:

  1. 检查验证SparkConf
   _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) 
      throw new SparkException("A master URL must be set in your configuration")
    
    if (!_conf.contains("spark.app.name")) 
      throw new SparkException("An application name must be set in your configuration")
    

    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) 
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    
    .....
  1. 设置默认参数
 _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
    _eventLogDir =
      if (isEventLogEnabled) 
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
       else 
        None
      
 _statusTracker = new SparkStatusTracker(this, _statusStore)
 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
 ...
  1. 创建常见Env
    SparkEnv: Spark执行环境(缓存、映射输出跟踪器等)
    创建Driver、executor的Env,均是调用create方法:
private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = 
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get("spark.driver.port").toInt
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) 
      Some(CryptoStreamUtils.createKey(conf))
     else 
      None
    
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  

create方法创建执行环境包括:

 private def create(
       conf: SparkConf,
      executorId: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Option[Int],
      isLocal: Boolean,
      numUsableCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = 
      //安全管理器
       val securityManager = new SecurityManager(conf, ioEncryptionKey)
      //RPC环境
      val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)
      //序列化器
      val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
      //广播管理器
       val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
       //Shuffle管理器
       val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    //内存管理器:默认是统一内存管理模型,可选静态管理模型
       val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) 
        new StaticMemoryManager(conf, numUsableCores)
       else 
        UnifiedMemoryManager(conf, numUsableCores)
      
      //块管理器
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)
      // Spark Metrics System 定期轮询指标数据到sink/源处
       MetricsSystem.createMetricsSystem("driver", conf, securityManager)
	//返回SparkEnv对象
   val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)
  1. 重要:创建Task/DAGScheduler
   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
   _schedulerBackend = sched
   _taskScheduler = ts
   _dagScheduler = new DAGScheduler(this)
   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
   //初始化完DAG/TaskScheduler后start
   _taskScheduler.start()

创建TaskScheduler:
只讨论StandAlone模式:
创建TaskSchedulerImpl,并由StandaloneSchedulerBackend接收TaskSchedulerImpl的控制,StandaloneSchedulerBackend会创建一个caseclass,将appName、maxCores、executorMemory等信息包装成ApplicationDescription,并创建一个AppClient,AppClient内部调用tryRegisterAllMasters 方法注册所有Master,一旦我们成功连接到一个Master,所有调度工作将被取消。

  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = 
      case SPARK_REGEX(sparkUrl) =>
       //1. 创建TaskSchedulerImpl,TaskScheduler的实际执行对象
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        //2. StandaloneSchedulerBackend包装TaskSchedulerImpl,接收TaskSchedulerImpl的控制
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
	

StandaloneSchedulerBackend:
StandaloneSchedulerBackend内部接收SparkContext,获取SparkConf信息,调用start方法

 override def start() 
	//3. 将application的信息包装成appDesc
  val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
     //4. 并创建AppClient,传入appDesc
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    //等待Master注册成功
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)

StandaloneAppClient
StandaloneAppClient接收rpcEnv、所有Master的URL、Application信息发送给cluster manager(StandAlone中是Master):

override def onStart(): Unit = 
      try 
        registerWithMaster(1)
       catch 
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      
 private def tryRegisterAllMasters(): Array[JFuture[_]] = 
      for (masterAddress <- masterRpcAddresses) yield 
        registerMasterThreadPool.submit(new Runnable 
          override def run(): Unit = try 
            if (registered.get) 
            //只要有一个Master响应获取注册App信息,则返回
              return
            
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //5. 通过rpc向所有Master发送注册信息RegisterApplication
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
           catch 
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          
        )
      
    

最后,initialize中创建task调度器(FIFO/FIAR):

  private def createTaskScheduler
  ....
 case SPARK_REGEX(sparkUrl) =>
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      (backend, scheduler)

  def initialize(backend: SchedulerBackend) 
    this.backend = backend
    schedulableBuilder = 
      schedulingMode match 
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      
    
    schedulableBuilder.buildPools()
  

DAGScheduler:
DAGScheduler中最重要的是DAGSchedulerEventProcessLoop,负责接收各种事件和各组件通信

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging 
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match 
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)
    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
	...
	...
   
  
	//如果出现异常,则取消所有job
	  override def onError(e: Throwable): Unit = 
    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
    try 
      dagScheduler.doCancelAllJobs()
     catch 
      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    
    dagScheduler.sc.stopInNewThread()
  

SparkContext运行作业

  1. Spark在触发Action操作时,会调用SparkContext的runJob操作:
  def collect(): Array[T] = withScope 
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  
  1. 将作业提交给DAGScheduler
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = 
    if (stopped.get()) 
      throw new IllegalStateException("SparkContext has been shutdown")
    
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) 
      logInfo("RDD's recursive dependencies:\\n" + rdd.toDebugString)
    
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    //保存当前RDD,在Job完成之后调用父rdd
    rdd.doCheckpoint()
  
  ....
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = 
   
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
 	....
    
  

提交任务——submitJob

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = 
   //JobWaiter等待Job被执行结束
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //eventProcessLoop匹配事件
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  

//匹配到DAGSchedulerEventProcessLoop的event——JobSubmitted
//调用 dagScheduler.handleJobSubmitted方法
...
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match 
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) spark源码剖析——sparkcontext的初始化_创建任务调度器taskscheduler

5. 创建任务调度器TaskSchedulerTaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler也可以看作任务调度的客户端。创建TaskScheduler的代码如下: createTaskScheduler方法会根据ma... 查看详情

spark源码剖析——sparkcontext的初始化_taskscheduler的启动

7.TaskScheduler的启动第五节介绍了TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码:TaskScheduler在启动的时候,实际调用了backend的start方法,即同时启动了backend。local模式下,这里的backend是localSchedulerBackend。在TaskS... 查看详情

spark源码剖析——sparkcontext的初始化_创建和启动dagscheduler

6.创建和启动DAGSchedulerDAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAGScheduler的代码如下:DAGScheduler的数据结构主要维护jobId和stage... 查看详情

spark源码解读-sparkcontext初始化过程

sparkcontext是spark应用程序的入口,每个spark应用都会创建sparkcontext,用于连接spark集群来执行计算任务.在sparkcontext初始化过程中会创建SparkEnv,SparkUI,TaskSchedule,DAGSchedule等多个核心类,我们会逐个分析他们.下面我们看一下sparkcontext的初... 查看详情

我的spark源码核心sparkcontext走读全纪录

我的Spark源码核心SparkContext走读全纪录DirverProgram(SparkConf) packageorg.apache.spark.SparkConfMaster    packageorg.apache.spark.deploy.masterSparkContext packageorg.apache.spark. 查看详情

spark[源码]-sparkcontext概述

h2{color:#fff;background-color:#7CCD7C;padding:3px;margin:10px0px}h3{color:#fff;background-color:#008eb7;padding:3px;margin:10px0px}SparkContext概述sparkContext是所有的spark应用程序的发动机引擎,就是说你想要运行spark程序就必须创建一个 查看详情

spark内核源码解析四:sparkcontext原理解析和源码解析

  源码解析主构造函数代码private[spark]var(schedulerBackend,taskScheduler)=SparkContext.createTaskScheduler(this,master)createTaskScheduler,创建TaskSchedulerImpl和SparkDeploySchedulerBackend对象  TaskSchedul 查看详情

spark内核源码解析四:sparkcontext原理解析和源码解析(代码片段)

  源码解析主构造函数代码private[spark]var(schedulerBackend,taskScheduler)=SparkContext.createTaskScheduler(this,master)createTaskScheduler,创建TaskSchedulerImpl和SparkDeploySchedulerBackend对象  TaskSchedul 查看详情

spark源码解读sparkcontext的初始化之taskscheduler的启动(代码片段)

Spark源码解读(一)SparkContext的初始化之TaskScheduler的启动TaskScheduler创建完成之后,需要调用start()方法才可以生效,SparkContext的代码如下:_taskScheduler.start()TaskScheduler在启动的时候,实际上调用了backend的start方法。(TaskSchedulerImpl.s... 查看详情

《深入理解spark:核心思想与源码分析》——sparkcontext的初始化(中)

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章环境准备》《深入理解Spark:... 查看详情

spark源码解读sparkcontext的初始化之创建和启动dagscheduler(代码片段)

Spark源码解读(五)SparkContext的初始化之创建和启动DAGSchedulerDAGScheduler主要用于在任务正式提交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建job,将DAG中的RDD划分到不同的Stage,提交Stage等等。SparkContext中创建DAGScheduler的代... 查看详情

spark源码解读sparkcontext的初始化之创建任务调度器taskscheduler(代码片段)

Spark源码解读(四)SparkContext的初始化之创建任务调度器TaskSchedulerTaskScheduler负责任务任务的提交,并请求集群管理器对任务的调度。创建TaskScheduler的代码如下:val(sched,ts)=SparkContext.createTaskScheduler(this,master,deployMode)createTaskSchedule... 查看详情

spark源码剖析:stage划分原理与源码剖析

...的代码,对其进行性能优化和排错。 stage划分原理与源码接着上期内核源码(五)的最后,每个action操作最终会 查看详情

从spark-shell到sparkcontext的函数调用路径过程分析(源码)

    不急,循序渐进,先打好基础   Sparkshell的原理  首先,我们清晰定位找到这几个。 1、spark-shell  2、spark-submit  3、spark-class    4、SparkSubmit.scala    查看详情

4.spark-2.4.6源码分析(基于yarncluster模式)-sparkcontext启动(代码片段)

当我们在程序中newSparkCotext的时候,首先根据传入的SparkConf创建一个新的SparkConf:private[spark]defcreateSparkEnv(conf:SparkConf,isLocal:Boolean,listenerBus:LiveListenerBus):SparkEnv=SparkEnv.createDriverEn 查看详情

《深入理解spark-核心思想与源码分析》第三章sparkcontext的初始化

3.1SparkContext概述  SparkConf负责配置参数,主要通过ConcurrentHaspMap来维护各种Spark的配置属性。classSparkConf(loadDefaults:Boolean)extendsCloneablewithLoggingwithSerializableimportSparkConf._/**CreateaSparkConfthatloadsdefaultsfromsystempropertiesandtheclasspath*/defthis... 查看详情

spark源码剖析:如何将spark源码导入到idea中

由于近期准备深入研究一下Spark的核心源码,所以开了这一系列用来记录自己研究spark源码的过程! 想要读源码,那么第一步肯定导入spark源码啦(笔者使用的是IntelliJIDEA),在网上找了一圈,尝试了好几种方法都没有成功,... 查看详情

sparkcontext的初始化(叔篇)——taskscheduler的启动

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章环境准备》《深入理解Spark:... 查看详情