Home > Archives > Spark基础之Spark Streaming概述

Spark基础之Spark Streaming概述

Published on

Spark Streaming的核心是DStream或Discretized Stream(离散流),代表着连续的数据流。 然后按照指定的间隔(duration)对数据进行切分并进行相应的计算,主要用于实时场景。DStream在每一个切分的时间片段对应的就是一个RDD,本质上DStream就是连续的RDD(a sequence of RDDS)。它依赖的数据源是多种多样的,比如说Kafka, Flume; 当然输出数据的存储介质也是多种多样的,比如说HDFS,Databases。

它主要有以下几个重要的组件:

基本流程

下面我们以Kafka为数据源,构建DirectKafkaInputDStream(工作中主要是使用的是Direct方式),并进行简单操作。借此来解释一次简单的操作中,上述各个组件是如何协作的。

val conf = new SparkConf().setAppName("Kafka Direct Stream")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))

// Kafka Producer向话题nums发送随机英文字母A - Z
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val kafkaDirectStream =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, Set("nums")
)

// 对于每个批次内出现字母的次数进行统计, 然后输出
val alphaCountDStream = kafkaDirectStream.map(_._2).map { alphabet =>
    alphabet -> 1
}.reduceByKey(_ + _)

alphaCountDStream.print()

ssc.start()
ssc.awaitTermination()

上述程序的逻辑非常简单,从Kafka中获取当前批次的字母,然后统计出现次数并且输出。大致可以分为三个阶段:

1. 构建StreamingContext并初始化一系列重要的组件

在StreamingContext初始化的同时,以下几个也同时被初始化了:

我们前面提到了DStreamGraph在初始化的时候inputStreams与outputStreams皆为空,但是在JobGenerator触发任务之前,DStream通过各种转换(map, filter等)和输出操作(output operation, like print),已经在DStreamGraph中注册了涉及到各种Input&OutStreams,所以为graph.generateJobs调用提供了基础 – 因为Job的产生是基于Stream的。

2. 构建DStream之间的依赖(dependency)

和RDD一样,DStream都会或多或少的产生依赖。每次的转换操作或是输出操作都会产生新的DStream,进而和原来DStream产生父子关系。最后print为DStream的输出操作,它只是将ForEachStream在DStreamGraph中注册为OutputStream和RDD action操作不一样的地方是,此处并没有触发ForEachStream的计算,真正的计算是发生在JobGenerator在指定时间点产生Job的时候

图1.DStream之间的依赖关系以及DStreamGraph的变化

3. 启动流式程序并且在指定时间点开始周而复始的计算

StreamingContext的启动,触发了JobScheduler以及JobGenerator的启动。

图2.StreamingContext启动之后生成任务并提交的流程

3.1 JobScheduler

JobScheduler启动的时候,内部的JobGenerator也启动了,它会按照指定间隔向JobScheduler提交任务(封装成了org.apache.spark.streaming.scheduler.JobSet)。之后JobScheduler便调用底层的线程池开始执行Job(jobExecutor.execute(new JobHandler(job)))。

3.2 JobGenerator

JobGenerator的主要任务是生成当前批次的JobSet,并且提交给JobScheduler执行,大致可以分为如下几个过程:

// org.apache.spark.streaming.scheduler.JobGenerator

private val timer = new RecurringTimer(
    clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), 
    "JobGenerator"
)

timer.start() -> thread.start()

// org.apache.spark.streaming.util.RecurringTimer
private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
}

private def loop() {
    ...
    while (!stoped) {
        triggerActionForNextInterval
    }
    triggerActionForNextInterval
    ...
}

def triggerActionForNextInterval() {
    ...
    clock.waitTillTime(nextTime)
    // longTime => eventLoop.post(GenerateJobs(new Time(longTime)))
    callback(nextTime)
    ...
}

代码片段1. JobGenerator循环提交任务的基本流程

3.3 DStreamGraph

DStreamGraph根据维护的OutputStream以及它的依赖关系开始生成最终任务所对应的RDD,在开篇已经提到过,DStream的计算本质上是RDD的计算

ForeachDStream不断调用父类DStream的compute方法生成相应的RDD,直到根类,在本例中也就是DirectKafkaInputDStream。当DirectKafkaInputDStream的compute方法被调用后生成Option[KafkaRDD],然后逐级返回,分别将子DStream所对应的函数施加在Option[KafkaRDD]上面,最终变成ShuffledRDD,并根据相应的缓存级别(storageLevel)缓存生成的RDD(newRDD.persist(storageLevel)),如果有需要也会进行相应的checkpoint(newRDD.checkpoint())

图3.DStreamGraph生成Job的过程

在上述生成ShuffledRDD过程中,RDD之间的依赖关系也已经形成,相当于DStreamGraph转换成了RDDGraph,为后面JobScheduler计算RDD做好了准备。

当JobScheduler接收到JobGenerator提交的任务之后,实际上就已经进入RDD的执行过程,在本例中就是打印出每个字母出现的次数。由于JobGenerator内部的triggerActionForNextInterval是定期运行的,所以生成JobSet,提交给JobScheduler运行的过程会一直执行下去

4. 批次计算完成之后的收尾工作

JobScheduler在完成任务(Job)的执行之后,主要进行下列几项工作:

总结

流式计算的本质是定时RDD的计算,也就是持续的RDD计算。整个计算就是DStreamGraph –> RDD Graph –> RDD Job的过程。涉及到两个层面的调度: JobSheduler用于DStream生成Job, DAGScheduler用于RDD的计算。

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方