Home > Archives > Spark Stateful Streaming剖析

Spark Stateful Streaming剖析

Published on

Spark Stateful Streaming的出现就是为了将不同的批次之间的状态关联起来,就像cookie和session将独立Http请求之间的状态关联起来一样,关于Spark Streaming的基础使用,可参照Spark基础之Spark Streaming概述,本文主要讲解mapWithState方法的使用、背后的实现以及容错过程中的一些思考。

下面以统计Kafka消息中词出现的次数来讲解整个过程,Kafka生产者不停的向Producer发送消息,Streaming定时获取并统计每个词的出现次数,如果发现出现次数高于多少则移除。

1.Kafka消息发送(kafka-console-producer)

/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count
AAAA
BBBB
AAAA
....

2.mapWithState方法涉及的类

首先它是PairDStreamFunctions[K, V]的一个方法,也就是说原始流一定要是DStream[K, V]的类型,才能被隐式转换成PairDStreamFunctions; 该方法的主要目的是对于流中的每一个Key, Value对调用相关的函数,同时维护流的状态以及生成相应的返回值,这样我们在调用mapWithState之后可以根据需要得到不同的返回值。

class PairDStreamFunctions[K, V](self: DStream[(K, V)])  {
    ....
}

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = { 
    ....
}

K: 转换流的Key类型,在本例中是String,也就是Kafka中的消息

V: 转换流的Value类型,在本例中是Int,单个单词出现次数

StateType: 状态类型,在本例中是Int,也就是单词出现次数汇总

MappedType: 对于转换流,在本例中是(String, Int),也就是(单词次数,出现次数汇总)

之所以叫转换流,是因为DirectKafkaInputDStream[K, V]中的K对应是Kafka消息中的key,一般为null;value对应的是Kafka消息中的value,一般是String。而这个消息中可能包含了丰富的内容,抑或是日志的一行,抑或是json string,所以一般都会经过简单处理形成新的DStream[K, V](转换流)之后才会调用mapWithState方法的。

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] extends Serializable {
    ...
    def initialState(rdd: RDD[(KeyType, StateType)]): this.type
    def numPartitions(numPartitions: Int): this.type
    def timeout(idleDuration: Duration): this.type
    ...    
}

StateSpec抽象了调用mapWithState的相关参数:

private[streaming] abstract class StateMap[K, S] extends Serializable {
    ...
    def get(key: K): Option[S]
    
    def getByTime(threshUpdatedTime: Long): Iterator[(K, S, Long)]
    
    def remove(key: K): Unit
    ...
}

Stateful streaming的状态维护的基本数据结构就是上面的StateMap,源码中的注释也体现了这一点: Internal interface for defining the map that keeps track of sessions

3.基本使用

经过上面的梳理,基本上对于mapWithState涉及的相关的类有一个了解,下面通过相应的代码来实现上文提到的word count,对于过期我们选择手动清理出现次数大于10次的。

// StateSpec.mappintFunction
private def mappingFunction(
    time: Time, key: String, valueOpt: Option[Int], wordCountState: State[Int]
): Option[(String, Int)] = {
    val thresholdCount = 10
    // key在StateMap中有值, 相当于调用StateMap.getOption(keyType).nonEmpty
    // 首先判断该key对应的次数, 如果大于10, 则移除
    if (wordCountState.exists) {
      // StateMap.get(keyType)
      val existingCount = wordCountState.get
    
      // 出现次数超过10次, 则移除
      if (existingCount > thresholdCount) {
         wordCountState.remove()
         None
      } else {
        val newCount = valueOpt.getOrElse(1) + existingCount
        // 更新状态
        wordCountState.update(newCount)
        Some(key -> newCount)
      }
    } else {
      // 首次出现, 则计1
      val newCount = valueOpt.getOrElse(1)
      wordCountState.update(newCount)
      Some(key -> newCount)
    }
}

从控制台显示来看,会发现某个单词,突然在下一次批次消失,比如下面的BBBBBBB:

Word AAAAAAAA, count 1
Word BBBBBBBBBBBBBB, count 2
Word BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB, count 1
Word BBBBBBBBBBBBBBBBBBBBBBBBBBBB, count 1
Word BBBBBBB, count 11

--------------------------- 

Word AAAAAAAA, count 1
Word BBBBBBBBBBBBBB, count 2
Word BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB, count 1
Word BBBBBBBBBBBBBBBBBBBBBBBBBBBB, count 1

4.状态更新

通过mapWithStream产生的流,它的状态更新主要是在MapWithStateRDD的compute方法中实现,核心实现为MapWithStateRDD.updateRecordWithData

case class MapWithStateRDDRecord[K, S, E](var stateMap: StateMap[K, S], var mappedData: Seq[E])

object MapWithStateRDDRecord {

    def updateRecordWithData(
        prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
        dataIterator: Iterator[(K, V)],
        mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
        batchTime: Time,
        timeoutThresholdTime: Option[Long],
        removeTimedoutData: Boolean
    ): MapWithStateRDDRecord[K, S, E] =  {
    
        // 通过克隆之前StateMap来创建新的map
        val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
        
        val mappedData = new ArrayBuffer[E]
        
        dataIterator.foreach { case (key, value) =>
        
            // mappingFunction真正的调用处
            val returned = mappingFunction(batchTime, key, Some(value), wrappedState)      
            
        }
        
        // 超时记录处理
        if (removeTimedoutData && timeoutThresholdTime.isDefined) { 
            ....
        }
        
        MapWithStateRDDRecord(newStateMap, mappedData)
    }

}

MapWithStateRDDRecord实际上是和MapWithStateRDDPartition一一对应的,也就是一个MapWithStateRDDRecord实例就对应了一个分区的数据。这样设计的原因是因为,一般状态涉及到聚合操作,所以会对于某一个key的状态进行加总,比如说本例中Word出现次数,因此无法设计成一个Key-Value对应一个 MapWithStateRDDRecord实例

在上面的更新程序中,我们可以看到当前批次产生的分区即dataIterator,直接对于prevRecord中的key对应的状态进行了更新,那么Spark是如何保证它们俩中的key是在同一个分区的呢?答案就在于: 确保RDD[MapWithStateRDDRecord[K, S, E]](历史状态)和RDD[(K, V)]分区器(Partitioner)相同,这样就能保证相同Key落入相同的分区,给同分区更新提供了基础

class MapWithStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag]( 
    ...
    private var prevStateRDD: RDD[MapWithStateRDDRecord[K, S, E]],
    private var partitionedDataRDD: RDD[(K, V)],
    ...
) {

    require(partitionedDataRDD.partitioner == prevStateRDD.partitioner)

}

每个分区更新完成之后返回值为MapWithStateRDDRecord,所以如果我们需要获取当前批次处理之后的结果,则获取的是每个分区mappedData的汇总; 如果要获取每个批次完成之后,流的历史状态(snapshots),则获取的是每个分区stateMap的汇总,即Stream快照(snapshots)。

// RDD[MapppedType] 对应 RDD[(String, Int)]
dstream.mapWithState.foreachRDD { rdd: RDD[MapppedType] => 

}

// RDD[(K, StateType)] 对应 RDD[(String, Int)]
dstream.mapWithState.stateSnapshots().foreachRDD { rdd: RDD[(K, StateType)] =>

}

5.容错机制的一些思考以及实现

这里的容错机制主要是指: 流式程序的状态会不断积累,如何在程序宕机之后恢复到之前的状态,上例中就是如何保证之前WordCount的统计不丢失。

Spark(1.6)对于Stateful Streaming强制要求使用Checkpoint将流式状态(主要指的是底层RDD),默认是10倍的slideDuration。但是目前这种实现有个问题:

class InternalMapWithStateDStream {
    // 默认的StorageLevel
    persist(StorageLevel.MEMORY_ONLY)
    
    override def initialize(time: Time): Unit = {
        ...
        checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
        ...
    }
    ...
}

如果重新运行的时候,代码发生了改变,可能会导致相应的反序列化问题,所以之前的状态变无法恢复; 如果删除的话,之前的状态以及消息的消费也可能会丢失。这对于不断更新业务逻辑的项目而言是不可接受的。

所以我们可以尝试手动维护Kaka偏置,同时对于带状态的流,我们需要定期导出Snapshots作为流状态的初始值,具体实现需要考虑的一些问题:

class DirectKafkaInputDStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
  ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]
) extends Serializable {

  private val logger = LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

  // commit offset at every interval
  def build() = {
    require(kafkaParams.contains("group.id"), "Consumer group id should not be empty")
    require(kafkaParams.contains("zookeeper.connect"), "Zookeeper connect should not be empty")

    val consumerGroupId = kafkaParams.get("group.id").get
    val zkConnect = kafkaParams.get("zookeeper.connect").get

    val zkClient = ZookeeperClient.connect(zkConnect)
    val getFromOffsetsFunc: ZookeeperClient => Map[TopicAndPartition, Long] = _.getFromOffsets(consumerGroupId, topic)

    val directKafkaInputDStream =
      CommonUtils.safeRelease(zkClient)(getFromOffsetsFunc)() match {
        case Success(storedFromOffsets) =>
          // try to get from offset from zookeeper or create a new one
          val directKafkaInputDStream =
            if (storedFromOffsets.nonEmpty) {
              val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
              KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
                ssc, kafkaParams, storedFromOffsets, messageHandler
              )
            } else {
              logger.info(s"Create direct stream with topics: ${topic}")
              KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, Set(topic))
            }

          directKafkaInputDStream
        case Failure(e) =>
          logger.error(s"Exception when creating DirectKafkaInputDStream", e)
          throw e
      }

    directKafkaInputDStream.foreachRDD { (rdd, time) =>

      val zkClient = ZookeeperClient.connect(zkConnect)
      CommonUtils.safeRelease(zkClient)(
        _.commitFromOffset(consumerGroupId, rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
      )()
    }

    directKafkaInputDStream
  }

}

等到获取当前批次的状态之后,根据相应的逻辑决定是否检出,

def periodicSnapShotDump[T](
    rdd: RDD[T], startTime: Long, currentTime: Long
  ): Unit = {
    val sparkConf = rdd.context.getConf

    for {
      snapShotDir <- sparkConf.getOption(STATE_SNAPSHOT_DIR)
      snapShotDuration <- sparkConf.getOption(STATE_SNAPSHOT_CP_DURATION)
    } yield {
      val partitionNumber = sparkConf.getInt(STATE_SNAPSHOT_PARTITION, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER)

      if (currentTime != startTime && (currentTime - startTime) % ScalaDuration(snapShotDuration).toMillis == 0) {
        cleanCheckpoint(rdd.context, snapShotDir)
        logger.info(s"Periodic checkpointing data into ${snapShotDir}")
        rdd.repartition(partitionNumber).saveAsObjectFile(snapShotDir)
      }
    }
}

本文实现的完整版,可参考KafkaWordCounter.scala。不过还有两个问题没有找到很好的解决方法:

所以计划在阅读«大数据系统构建:可扩展实时数据系统构建原理与最佳实践»这本书之后,尝试使用Lambda架构结合Redis来解决。

参考

> Exploring Stateful Streaming

> Spark Streaming中Driver的容灾

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方