博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming源码解读之Job动态生成和深度思考
阅读量:6619 次
发布时间:2019-06-25

本文共 13690 字,大约阅读时间需要 45 分钟。

hot3.png

一、Spark Streaming JOB生成深度思考

定时任务,其实也算是流处理的一种,都是时间加上定时器(也有可能是某个条件),一切处理都是流处理。

从JobGenerator作为入口,JobGenerator是动态生成JOB的封装。主要是基于Dstream的依赖关系根据batchDuration生成JOB,及spark的流处理跟storm不太一样,storm是流入一条计算一条,而spark的流处理是基于时间段的批处理

/** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. */private[streaming]class JobGenerator(jobScheduler: JobScheduler) extends Logging {

JobGenerator只是负责生成任务,并不执行任务,而是由RDD来触发作业的提交

二、Spark Streaming JOB生成源码分析

  1. JobGenerator负责生成任务
  2. JobScheduler负责任务的调度
  3. RecurringTimer定时触发任务生成事件

从JobGenerator开始作分析:

注意JobScheduler中也有一个eventLoop消息线程,这个消息线程主要是(JobHandler)通知JobScheduler任务的开始完成等事件:

private[scheduler] sealed trait JobSchedulerEventprivate[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEventprivate[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEventprivate[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent

 JobGenerator中的消息线程:

/** Start generation of jobs */  def start(): Unit = synchronized {    if (eventLoop != null) return // generator has already been started    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.    // See SPARK-10125    checkpointWriter    //接收任务的各种事件(如任务生成,清除元信息、DoCheckpoint)    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)            override protected def onError(e: Throwable): Unit = {        jobScheduler.reportError("Error in job generator", e)      }    }    eventLoop.start()    if (ssc.isCheckpointPresent) {      restart()    } else {      startFirstTime()    }  }

我们关注processEvent方法:

/** Processes all events */  private def processEvent(event: JobGeneratorEvent) {    logDebug("Got event " + event)    event match {      //根据时间点来生成任务      case GenerateJobs(time) => generateJobs(time)      case ClearMetadata(time) => clearMetadata(time)      case DoCheckpoint(time, clearCheckpointDataLater) =>        doCheckpoint(time, clearCheckpointDataLater)      case ClearCheckpointData(time) => clearCheckpointData(time)    }  }

这里有四个事件 ,我们根据跟踪generateJobs方法:

/** Generate jobs and perform checkpoint for the given `time`.  */  private def generateJobs(time: Time) {    // Set the SparkEnv in this thread, so that job generation code can access the environment    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.    SparkEnv.set(ssc.env)    Try {      //这里是分配blocks给当前时间点(receiverTracker记录了Blocks的元信息)      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch      //DstremGraph生成JOB,DstremGraph记录了Dstream的DAG      graph.generateJobs(time) // generate jobs using allocated block    } match {      case Success(jobs) =>        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))      case Failure(e) =>        jobScheduler.reportError("Error generating jobs for time " + time, e)    }    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))  }

我们回到DStreamGraph的generateJobs方法:

这里使用flatMap是为了去掉None类型的JOB,扁平化返回值

def generateJobs(time: Time): Seq[Job] = {    logDebug("Generating jobs for time " + time)    val jobs = this.synchronized {      //这里的outputStreams 是我们每次调用foreachRDD会向DStreamGraph注册输出的outputStream      // private val outputStreams = new ArrayBuffer[DStream[_]]()      outputStreams.flatMap { outputStream =>        val jobOption = outputStream.generateJob(time)        jobOption.foreach(_.setCallSite(outputStream.creationSite))        jobOption      }    }    logDebug("Generated " + jobs.length + " jobs for time " + time)    jobs  }

继续追踪outputStream.generateJob(time)(注意这个outputStream就是ForEachDStream 的一个实例)这个方法:

/**   * Generate a SparkStreaming job for the given time. This is an internal method that   * should not be called directly. This default implementation creates a job   * that materializes the corresponding RDD. Subclasses of DStream may override this   * to generate their own jobs.   */  private[streaming] def generateJob(time: Time): Option[Job] = {    //这个方法根据Dstream生成了RDD    getOrCompute(time) match {      case Some(rdd) => {        val jobFunc = () => {          val emptyFunc = { (iterator: Iterator[T]) => {} }          //这里我们看到提交任务是基于RDD的,真正向DAG提交任务是被封装到一个函数中,因此不会马上运行          context.sparkContext.runJob(rdd, emptyFunc)        }        Some(new Job(time, jobFunc))      }      case None => None    }  }

我们继续看Dstream类的getOrCompute方法,追踪如何生成RDD:

/**   * Get the RDD corresponding to the given time; either retrieve it from cache   * or compute-and-cache it.   */  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {    // If RDD was already generated, then retrieve it from HashMap,    // or else compute the RDD    //注意Dstream是抽象类,所以每个Dstream的实现类都自己的generatedRDDs这个对象,    即我们在代码里边所做的Dstream的转换最终作用于最开始的那个RDD,每个Dstream都持有自己的RDD实例,最终计算的时候只需要最后    一个RDD即可    generatedRDDs.get(time).orElse {      // Compute the RDD if time is valid (e.g. correct time in a sliding window)      // of RDD generation, else generate nothing.      if (isTimeValid(time)) {        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {          // Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details. We need to have this call here because          // compute() might cause Spark jobs to be launched.          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            compute(time)          }        }        rddOption.foreach { case newRDD =>          // Register the generated RDD for caching and checkpointing          if (storageLevel != StorageLevel.NONE) {            newRDD.persist(storageLevel)            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")          }          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {            newRDD.checkpoint()            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")          }          //每个Dstream的实现类都有自己的RDD!          generatedRDDs.put(time, newRDD)        }        rddOption      } else {        None      }    }  }

从上面的代码中我们看到最终落在了compute(time) 这个关键的方法上面,由于这个方法是个抽象类,我们需要从子类中找实现,以WordCont 程序为例,我们最后一个Stream是ForEachDStream:

//ForEachDStream的46行override def generateJob(time: Time): Option[Job] = {    //这里的Parent就是ShuffledDStream     parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {          foreachFunc(rdd, time)        }        Some(new Job(time, jobFunc))      case None => None    }  }

每个Dstream都是先计算ParentDstream也就是不断生成RDD链条的过程,最终我们到ReceiverInputDStream 这个类

/**   * Generates RDDs with blocks received by the receiver of this stream. */  override def compute(validTime: Time): Option[RDD[T]] = {    val blockRDD = {      if (validTime < graph.startTime) {        // If this is called for any time before the start time of the context,        // then this returns an empty RDD. This may happen when recovering from a        // driver failure without any write ahead log to recover pre-failure data.        new BlockRDD[T](ssc.sc, Array.empty)      } else {        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream        // for this batch        val receiverTracker = ssc.scheduler.receiverTracker        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)        // Register the input blocks information into InputInfoTracker        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)        // Create the BlockRDD        createBlockRDD(validTime, blockInfos)      }    }    Some(blockRDD)  }

上面的代码就是从开始生成的第一个RDD的过程,我们每次调用函数的过程都是将函数作用于RDD的过程,也就是生成了RDD,每个Dstream都持有自己的RDD最终我们对RDD调用行动算子的时候是对最后一个Dstream中的RDD进行操作!

我们回到事件处理哪里,思考任务生成事件是从哪里来的?任务是以不间断的生成的,那么必须要一个定时器不断地往eventLoop中post消息(JobGenerator的58行):

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

这里RecurringTimer 接收了一个    callback: (Long) => Unit类型的函数

这里RecurringTimer 中有一个线程不断地往队列中post任务消息:

private[streaming]class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)  extends Logging {  private val thread = new Thread("RecurringTimer - " + name) {    setDaemon(true)    override def run() { loop }  }

 重复的触发事件并且回调我们传入的函数:longTime => eventLoop.post(GenerateJobs(new Time(longTime))),及不断地往队列中post消息(RecurringTimer 的103行):

/**   * Repeatedly call the callback every interval.   */  private def loop() {    try {      while (!stopped) {        triggerActionForNextInterval()      }      triggerActionForNextInterval()    } catch {      case e: InterruptedException =>    }  }}

我们看看triggerActionForNextInterval 如何生成任务消息:

private def triggerActionForNextInterval(): Unit = {    clock.waitTillTime(nextTime)    callback(nextTime)    prevTime = nextTime    nextTime += period    logDebug("Callback for " + name + " called at time " + prevTime)  }

总结:JobGenerator接收RecurringTimer中发过来的各种事件,例如生成JOB的事件,然后由JobGenerator来分别处理各种任务事件,这种方式可以重复利用代码,不同的模块负责不同的功能,一方面是解耦,另一方是模块化

最后我们关注一点:任务怎么被提交到集群的?

我们回到JobGenerator的generateJobs方法(241行):

/** Generate jobs and perform checkpoint for the given `time`.  */    SparkEnv.set(ssc.env)    Try {      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch      //这里就是我们前面分析的任务怎么生成的      graph.generateJobs(time) // generate jobs using allocated block    } match {      case Success(jobs) =>        //任务生成后怎么处理?        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))      case Failure(e) =>        jobScheduler.reportError("Error generating jobs for time " + time, e)    }    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))  }

这里我们看到:JobGenerator生成好任务后交给了jobScheduler来处理

def submitJobSet(jobSet: JobSet) {    if (jobSet.jobs.isEmpty) {      logInfo("No jobs added for time " + jobSet.time)    } else {      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))      jobSets.put(jobSet.time, jobSet)      //我们看到foreach传入了一个处理JOB的函数:job => jobExecutor.execute(new JobHandler(job))      //注意这里使用了一个线程池来执行任务      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))      logInfo("Added jobs for time " + jobSet.time)    }  }

我们看看JobHandler怎么处理我们传入的任务:

private class JobHandler(job: Job) extends Runnable with Logging {    import JobScheduler._    def run() {      try {        val formattedTime = UIUtils.formatBatchTime(          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"        ssc.sc.setJobDescription(          s"""Streaming job from $batchLinkText""")        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)        var _eventLoop = eventLoop        if (_eventLoop != null) {         //这里把任务开始事件通知JobScheduler 任务开始了          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            //最终调用了job.run方法来处理任务,思考jbo.run做了哪些事?            job.run()          }          _eventLoop = eventLoop          if (_eventLoop != null) {          //这里把任务开始事件通知JobScheduler 任务完成了            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))          }        } else {          // JobScheduler has been stopped.        }      } finally {        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)      }    }  }

我们追踪:job.run方法:

def run() {    _result = Try(func())  }

这里执行了一个func方法,那这个方法从哪里来的,又做了什么事?

前面我们分析过,DStream中生成的任务只是封装了一个函数并没有执行,再次回归Dstream中的:

/**   * Generate a SparkStreaming job for the given time. This is an internal method that   * should not be called directly. This default implementation creates a job   * that materializes the corresponding RDD. Subclasses of DStream may override this   * to generate their own jobs.   */  private[streaming] def generateJob(time: Time): Option[Job] = {    getOrCompute(time) match {      case Some(rdd) => {        val jobFunc = () => {          val emptyFunc = { (iterator: Iterator[T]) => {} }          context.sparkContext.runJob(rdd, emptyFunc)        }        Some(new Job(time, jobFunc))      }      case None => None    }  }

及我们生成的JOB中的那个func() 就是:

      val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
这个函数,这里最终作用于RDD向DAG提交任务:

/**   * Run a job on all partitions in an RDD and return the results in an array.   */  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {    runJob(rdd, func, 0 until rdd.partitions.length)  }

这里就不继续追踪下去了,到此任务怎么生成以及任务怎么被提交的已经全部分析完成。

最后附上一张JOB动态生成简图:

转载于:https://my.oschina.net/u/1253652/blog/675260

你可能感兴趣的文章
第六章:thymeleaf页面模版-3. 处理内置对象
查看>>
鼠标移动可改变DIV的大小
查看>>
为什么你需要将代码迁移到ASP.NET Core 2.0?
查看>>
第二周实习小记-----大牛离我们有多远
查看>>
SpringMVC通过@autowired自动装载Service时提示装载错误
查看>>
ajax与spring mvc请求的总结
查看>>
百度API
查看>>
指南:在Github和Git上如何Fork
查看>>
i3 窗口管理器使 Linux 更美好
查看>>
动画_Interpolator(插值器)
查看>>
基于Centos7.2的DNS服务器搭建
查看>>
Sass--混合指令 (Mixin Directives)
查看>>
学习笔记DL005:线性相关、生成子空间,范数,特殊类型矩阵、向量
查看>>
MySQL同表更新与查询冲突
查看>>
Java编程思想(static关键字)
查看>>
北京云栖大会 Tech Insight 金融级分布式架构分享一览
查看>>
推荐一个有趣的Chrome扩展程序-查看任意网站的开发技术栈
查看>>
Apache默认虚拟主机
查看>>
商城系统针对开发者自有支付系统提供的解决方案
查看>>
mysql load data 批量导入使用
查看>>