Home > Archives > Spark Streaming中关于exactly-once语义的思考

Spark Streaming中关于exactly-once语义的思考

Published on

Spark Streaming概述中主要介绍了Streaming的基本概念以及DirectKafkaInputDStream的执行流程。在流式计算中,还有一个比较重要的概念就是端对端exactly-once语义,在Spark Streaming中也就是说需要确保消息只被消费一次,并且只对外输出一次。

在DirectKafkaInputDStream中,我们需要考虑如下问题:

1.消费一致性

1.1 存储的数据

因为KafkaStream计算的时候才会去从Kafka中进行消费,所以Checkpoint的结构只需要保存offset即可,并且该数据结构需要能不断更新,在DirectKafkaInputDStream中,该数据结构为可变的HashMap。

class DirectKafkaInputDStreamCheckpointData {
    ...
    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
    }
    ...
}

OffsetRangeTuple维护的是某个话题某个分区的消费起始Offset和结束Offset。

1.2 Checkpoint的时机

如果需要开启Checkpoint机制,首先需要在设置相应的Checkpoint dir。

ssc.checkpoint(checkpointDirectory)

通过存储Offset的机制保证了即使任务执行失败,当前batch也可以再次执行。

1.3 恢复的时机

在StremingContext初始化的时候,也会同时初始化StreamingGraph,这个时候就会尝试去从checkpoint中去恢复数据

cp_.graph.restoreCheckpointData()

2.输出幂等

上面提到过,如果Offset被checkpoint之后,任务执行失败了,由于Spark是分partition计算的,可能一部分数据已经落入存储了,那么batch重新计算的时候,就可能会产生重复数据,所以需要我们自己手动控制。

2.1 ElasticSearch

在ES中,如果ID相同,则会直接更新,借助这一特性,我们可以为每一条记录分配唯一的ID,这样即使batch重算,也不会出现重复数据。

2.2 数据库

由于我们在批次中checkpoint了相关的信息,比如说批次时间,批次对应的offset等,所以在重算或者恢复的时候,这些信息依然可以获得,结合partitionId便可以构成当前批次的唯一标识。

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // 带事务的操作,如果uniqueId对应的记录不存在,则插入,反之则放弃当次操作
    insertIfNotExists(batchRecords)
  }
}

最后的话,对于Spark Streaming宣称的exactly-once语义,从实现上来看实际上是不满足的, 首先消息会重复消费,其次消息也是会被重复处理的。所以它最多能保证at-least once语义。在分布式系统中,exactly-once delivery是一大难题,但是在Kafka 0.11版本中实现了该机制并且很好的融入到Kafka Stream中去了。

参考

> Spark Streaming消费Kafka数据的两种方案

> You can not have exactly once delivery

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方