Home > Archives > Spark基础之EventLoop实现

Spark基础之EventLoop实现

Published on

Spark内部的很多模块会将相关的行为抽象成一个个事件:

这些事件产生之后,必然有后续的处理,Spark利用EventLoop来将这两种行为解耦,一般需要处理各种行为的组件内部都有一个EventLoop成员变量,并且在自身启动的时候,初始化EventLoop。组件内部也会为各种事件定义了对应的处理机制。

EventLoop实现

EventLoop主要包含如下几个方面:

EventLoop

图1.EventLoop的组成

下面我们通过JobGenerator(Spark Streaming)中的EventLoop来更好的理解上面的流程:

(1) 启动EventLoop

在JobGenerator启动的时候,EventLoop将被真正赋值(null -> new EventLoop)并启动,内部eventThread开始从eventQueue中取事件,如果没有则阻塞。

(2) 在指定时间点,提交GenerateJobs事件

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

eventThread从eventQueue获取事件,触发onReceive方法,进而调用JobGenerator的processEvent方法,包括各种事件的具体处理逻辑。

private def processEvent(event: JobGeneratorEvent) {
    event match {
      ...
      case GenerateJobs(time) => generateJobs(time)
      ...
    }
  }
}

(3) JobGenerator关闭的时候同时停止内部的EventLoop

关闭EventLoop主要的任务就是停止内部的eventThread,以及子类实现的onStop()方法。

def stop() = {
    if (compareAndSet(false, true)) {
        eventThread.interrupt()
        try {
            eventThread.join()
            onStop()
            ....
        } catch {
            case ie: InterruptedException =>
              // 这是为了让打断状态能被上层的方法和线程组知晓
              Thread.currentThread().interrupt()
        }
    }
}

实际应用

在开发或者是测试中,我们经常会碰到这样的需求,特别是在Spark Streaming中,我们需要Kafka能够定时发送一些消息,比如说每5秒一次,一般我们会这样实现。

def produceRecord() {}

while (true) {
    produceRecord()
    Thread.sleep(5000)
}

虽然能够达到目的,但是显得不够fashion,所以我们可以尝试利用org.apache.spark.streaming.util.RecurringTimerEventLoop来实现一个定时发送Kafka消息的程序,由于这两个类在Spark中都设置了包访问权限,所以我们可以在自己的程序中按package org.apache.spark.streaming构建包名,这样就可以访问这两个类了。至于具体实现则要注意以下几点:

具体实现:

package org.apache.spark.streaming

import java.util.Properties
import java.util.concurrent.locks.ReentrantLock
import scala.util.Random
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{EventLoop, SystemClock}

// behavior related to KafkaMessageProducer encapsulated as case class/object Event
sealed trait KafkaMessageEvent
case class GenerateMessages(time: Time) extends KafkaMessageEvent

object KafkaMessageProducer {

  private val lock = new ReentrantLock()
  private val condition = lock.newCondition()

  var eventLoop: EventLoop[KafkaMessageEvent] = null
  var producer: KafkaProducer[String, String] = null

  def start(): Unit = {
    eventLoop = new EventLoop[KafkaMessageEvent]("MessageGenerator") {
      override protected def onReceive(event: KafkaMessageEvent): Unit = processEvent(event)
      override protected def onError(e: Throwable): Unit = throw e
    }
    eventLoop.start()
    producer = getKafkaProducer()

    val startTime = new Time(timer.getStartTime())
    timer.start(startTime.milliseconds)
  }

  def getKafkaProducer() = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    new KafkaProducer[String, String](props)
  }

  // execution interval
  val durationInMillis = Seconds(5).milliseconds
  val timer = new RecurringTimer(
      new SystemClock, durationInMillis,
      longTime => eventLoop.post(GenerateMessages(new Time(longTime))), "MessageGenerator"
  )

  // called by EventLoop's onReceive to handle the logic
  def processEvent(kafkaMessageEvent: KafkaMessageEvent) = {
    kafkaMessageEvent match {
      case GenerateMessages(time) => sendMessage(producer, time)
    }
  }

  def sendMessage(producer: KafkaProducer[String, String], time: Time) = {
     val alphabets = List("A", "B", "C", "D")
     Random.shuffle(alphabets).take(1).foreach { alpha =>
       val producerRecord: ProducerRecord[String, String] = new ProducerRecord("alphabets", alpha)
       producer.send(producerRecord)
     }
  }

  def main(args: Array[String]): Unit = {
      start()
      waitTillEnd()
  }

  // used to block the program
  def waitTillEnd(): Unit = {
    lock.lock
    try {
      condition.await
    } finally {
      lock.unlock
    }
  }

}

其实代码本身并没有什么价值,不过我们可以借鉴行为抽象成事件以及EventLoop这两种思想,并且运用在我们的编程中。

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方