第13课:sparkstreaming源码解读之driver容错安全性

     2022-05-18     764

关键词:

本期内容:

  1. ReceivedBlockTracker容错安全性

  2. DStream和JobGenerator容错安全性


Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管理的各组件信息(调度和驱动层面)


元数据采用了WAL的容错机制

case AddBlock(receivedBlockInfo) =>
  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
    walBatchingThreadPool.execute(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        if (active) {
          context.reply(addBlock(receivedBlockInfo))
        } else {
          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }
  
  ...
  
  /** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  receivedBlockTracker.addBlock(receivedBlockInfo)
}


元数据其实是交由ReceivedBlockTracker管理的。

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  try {
    val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
    if (writeResult) {
      synchronized {
        getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
    case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
      false
  }
}

首先会调用writeToLog方法:

/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
  if (isWriteAheadLogEnabled) {
    logTrace(s"Writing record: $record")
    try {
      writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
        clock.getTimeMillis())
      true
    } catch {
      case NonFatal(e) =>
        logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
        false
    }
  } else {
    true
  }
}


然后再将数据写入streamIdToUnallocatedBlockQueue 队列中。


每隔batchInterval时间后,Streaming的job被触发运行。此时要将streamIdToUnallocatedBlockQueue队列中的数据分配给具体的某个time。

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    }.toMap
    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  } else {
    // This situation occurs when:
    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
    // possibly processed batch job or half-processed batch job need to be processed again,
    // so the batchTime will be equal to lastAllocatedBatchTime.
    // 2. Slow checkpointing makes recovered batch time older than WAL recovered
    // lastAllocatedBatchTime.
    // This situation will only occurs in recovery time.
    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
  }
}

在此过程中也会写WAL日志


JobGenerator在每隔batchInterval时间,会被触发产生job

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

最后往消息循环队列中放一个DoCheckpoint的消息。

JobGenerator接到消息后:

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}
/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
  if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    logInfo("Checkpointing graph for time " + time)
    ssc.graph.updateCheckpointData(time)
    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
  }
}

根据ssc和time生成了一个Checkpoint对象。而ssc中有Driver的一切信息。所以当Driver崩溃后,能够根据Checkpoint数据来恢复Driver。

恢复的代码如下:

/** Restarts the generator based on the information in checkpoint */
private def restart() {
  // If manual clock is being used for testing, then
  // either set the manual clock to the last checkpointed time,
  // or if the property is defined set it to that time
  if (clock.isInstanceOf[ManualClock]) {
    val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
    val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
    clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
  }

  val batchDuration = ssc.graph.batchDuration

  // Batches when the master was down, that is,
  // between the checkpoint and current restart time
  val checkpointTime = ssc.initialCheckpoint.checkpointTime
  val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
  val downTimes = checkpointTime.until(restartTime, batchDuration)
  logInfo("Batches during down time (" + downTimes.size + " batches): "
    + downTimes.mkString(", "))

  // Batches that were unprocessed before failure
  val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
  logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
    pendingTimes.mkString(", "))
  // Reschedule jobs for these times
  val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
    .distinct.sorted(Time.ordering)
  logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
    timesToReschedule.mkString(", "))
  timesToReschedule.foreach { time =>
    // Allocate the related blocks when recovering from failure, because some blocks that were
    // added but not allocated, are dangling in the queue after recovering, we have to allocate
    // those blocks to the next batch, which is the batch they were supposed to go.
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
  }

  // Restart the timer
  timer.start(restartTime.milliseconds)
  logInfo("Restarted JobGenerator at " + restartTime)
}



备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1775946

第15课:sparkstreaming源码解读之noreceivers彻底思考

本期内容:DirectAccessKafka前面有几期我们讲了带Receiver的SparkStreaming应用的相关源码解读。但是现在开发SparkStreaming的应用越来越多的采用NoReceivers(DirectApproach)的方式,NoReceiver的方式的优势: 1.更强的控制自由度 2.语义... 查看详情

第12课:sparkstreaming源码解读之executor容错安全性

一、SparkStreaming数据安全性的考虑:SparkStreaming不断的接收数据,并且不断的产生Job,不断的提交Job给集群运行。所以这就涉及到一个非常重要的问题数据安全性。SparkStreaming是基于SparkCore之上的,如果能够确保数据安全可好的话... 查看详情

(版本定制)第16课:sparkstreaming源码解读之数据清理内幕彻底解密

本期内容:  1、SparkStreaming元数据清理详解  2、SparkStreaming元数据清理源码解析一、如何研究SparkStreaming元数据清理操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手。因为DStream是R... 查看详情

(版本定制)第15课:sparkstreaming源码解读之noreceivers彻底思考

...式,操作数据来源使用一个封装器,且是RDD类型的。所以SparkStreaming就产生了自定义RDD–>KafkaRDD.源码分析:1、KafkaRDD源码private 查看详情

第14课:sparkstreaming源码解读之state管理之updatestatebykey和mapwithstate解密

...,那如果需要单词计数一直的累加下去,该如何实现呢?SparkStreaming提供了两种方法:updateStateByKey和mapWithState。mapWithState是1.6版本新增功能,目前属于实验阶段。mapWithState具官方说性能较update 查看详情

第12课:sparkstreaming源码解读之executor容错安全性

Receiver接收到的数据交由ReceiverSupervisorImpl来管理。ReceiverSupervisorImpl接收到数据后,会数据存储并且将数据的元数据报告给ReceiverTracker。Executor的数据容错可以有三种方式:WAL日志数据副本接收receiver的数据流回放/** Store bl... 查看详情

(版本定制)第7课:sparkstreaming源码解读之jobscheduler内幕实现和深度思考

...cheduler内幕实现  2、JobScheduler深度思考JobScheduler是SparkStreaming的调度核心,地位相当于SparkCore上调度中心的DAGScheduler,非常重要!JobGenerator每隔BatchDuration时间会动态的生成JobSet提交给JobScheduler,JobSche 查看详情

(版本定制)第14课:sparkstreaming源码解读之state管理之updatestatebykey和mapwithstate解密

...至一周的数据,这个时候不可避免的要进行状态管理,而SparkStreaming每个BatchDuractions都会产生一个Job,Jo 查看详情

(版本定制)第12课:sparkstreaming源码解读之executor容错安全性

...为什么不考虑数据计算的安全容错呢?原因是计算的时候SparkStreaming是借助于SparkCore上RDD的安全容错的,所以天然的安全可靠的。Executor的安全容错主要有:  1、数据副本:   查看详情

(版本定制)第11课:sparkstreaming源码解读之driver中的receivertracker彻底研究和思考

本期内容:  1、ReceiverTracker的架构设计  2、消息循环系统  3、ReceiverTracker具体实现上节课讲到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker... 查看详情

第9课:sparkstreaming源码解读之receiver在driver的精妙实现全生命周期彻底研究和思考

在SparkStreaming中对于ReceiverInputDStream来说,都是现实一个Receiver,用来接收数据。而Receiver可以有很多个,并且运行在不同的worker节点上。这些Receiver都是由ReceiverTracker来管理的。在ReceiverTracker的start方法中,会创建一个消息通信体... 查看详情

第11课:sparkstreaming源码解读之driver中的receivertracker架构设计以及具体实现彻底研究

上节课将到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。一、ReceiverTracker主要的功能:在Executor上启动Receivers。停止Receivers。更新Receiver接收数... 查看详情

第10课:sparkstreaming源码解读之流数据不断接收全生命周期彻底研究和思考

上一课我们讲解了Receiver启动的流程。Receiver是通过ReceiverSupervisor的start方法启动的:/** Start the supervisor */def start() {  onStart()  startReceiver()}首先会调用ReceiverSuper 查看详情

(版本定制)第10课:sparkstreaming源码解读之流数据不断接收全生命周期彻底研究和思考

本期内容:  1、数据接收架构设计模式  2、数据接收源码彻底研究1、Receiver接受数据的过程类似于MVC模式:Receiver,ReceiverSupervisor和Driver的关系相当于Model,Control,View,也就是MVC。Model就是Receiver,存储数据Control... 查看详情

spark定制版:016~sparkstreaming源码解读之数据清理内幕彻底解密

本讲内容:a.SparkStreaming数据清理原因和现象b.SparkStreaming数据清理代码解析注:本讲内容基于Spark1.6.1版本(在2016年5月来说是Spark最新版本)讲解。上节回顾上一讲中,我们之所以用一节课来讲NoReceivers,是因为企业级SparkStreaming... 查看详情

spark版本定制八:sparkstreaming源码解读之rdd生成全生命周期彻底研究和思考

...RDD依靠什么生成?根据DStream来的RDD生成的依据是什么?SparkStreaming中RDD的执行是否和SparkCore中的RDD执行有所不同?运行之后我们对RDD怎么处理?ForEachDStream不一定会触发Job的执行, 查看详情

15sparkstreaming源码解读之noreceivers彻底思考

在前几期文章里讲了带Receiver的SparkStreaming应用的相关源码解读,但是现在开发SparkStreaming的应用越来越多的采用NoReceivers(DirectApproach)的方式,NoReceiver的方式的优势: 1.更强的控制自由度 2.语义一致性 其实NoReceivers... 查看详情

sparkstreaming源码解读之数据清理内幕彻底解密

本期内容:SparkStreaming数据清理原理和现象SparkStreaming数据清理代码解析   SparkStreaming一直在运行的,在计算的过程中会不断的产生RDD,如每秒钟产生一个BachDuration同时也会产生RDD,  在这个过程中除了基本的RDD外还有累... 查看详情