Home > Archives > 从Future产生到结果收集

从Future产生到结果收集

Published on

Future在异步编程中使用非常广泛,Scala中的Actor就是建立在Future之上的,而Actor又在Akka和Spark中被广泛使用,重要性不言而喻。对于Future的语法和基本使用,网上已经有太多的资源了。本文想以源码为基础,就Future的结果收集进行剖析,不求多么深刻,但求能对结构收集的每一步有个大致的了解。

A Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking).

Future的结果 - 非阻塞式 vs 阻塞式收集

val sumResult = Future((1L to 100000000L).sum)
非阻塞式的取值(onComplete onSuccess onFailure)
sumResult onComplete {
  case Success(s) => println(" Future succeed " + sumResult)
  case Failure(f) => println(" Future failed")
}

阻塞式的取值(Await.result)

Await.result(sumResult, Duration.Inf)

使用Future的初衷是为了不阻塞,所以尽量不要使用阻塞式取值,除非你不得不这么做。在项目开发中,一般通过For ... Yield将多个Future组合起来,最后调用Await.result获取最终的结果。

for {
    A <- Future[A]
    B <- Future[B]
    C <- Future[C]
} yield {
    op(A, B, C)
}

针对阻塞式的取值,以上面的代码为例就会阻塞主线程:

package scala

// STEP1
package concurrent {
    object Await {
       def result[T](awaitable: Awaitable[T], atMost: Duration): T =
        blocking(awaitable.result(atMost)(AwaitPermission))
    }
}

// STEP2 
package object concurrent {
   def blocking(body: => T) = 
       BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)
}


// STEP3
package scala.concurrent

object BlockContext {

    private val contextLocal = new ThreadLocal[BlockContext]()
    
    def current = contextLocal.get {
        case null => Thread.currentThread match {
            case ctx: BlockContext => ctx
            case _ => defaultBlockContext
        }
        case some => some
    }

}

在最初阅读current方法时候会疑惑,为什么Thread.currentThread会匹配BlockContext类型,源码中的解释是: 有些线程池实现的时候会让Thread继承BlockContext。

对于非阻塞式的取值,我们可以通过Future的相关方法来判断它的当前状态。

sumResult.isCompleted

显然onCompleteonSuccessonFailure这些回调也是根据当前Future的状态来执行相应的操作的。那么Future内部的状态有哪几种呢?这些状态之间又是如何切换的呢?

1.Future的状态以及改变

要研究Future的状态变化,最好的方式就是利用onComplete的调用栈并且打断点来摸清整个调用过程,下面通过代码调用来一步一步解释(由于之前没有接触过UML图,所以可能存在一些纰漏,以下涉及到UML图的地方仅供参考)。

Future的初始化和结果收集UML

1.1 Future的初始化 – Future.apply

Future初始化的时候(实际上是DefaultPromise的初始化),会调用AbstractPromise的updateState方法来设置初始状态。

class DefaultPromise[T] extends AbstractPromise { self =>
  updateState(null, Nil) // 实际上更新的是AbstractPromise中的_ref属性
}

所以,Future初始化之后状态变成了Nil(类型实际上是List[CallbackRunnable])

1.2 Future中的任务开始执行 – PromiseCompletingRunnable

// executor ---> package scala.concurrent.impl.ExecutionContextImpl    
  def apply[T](body: => T)(implicit executor ExecutionContext): 
     scala.concurrent.Future[T] = {
     /*
        这里也体现了Promise的语义执行Runnable并且返回Future
        每次启动一个异步计算的时候,PromiseCompletingRunnable都会被创建一次
        body是懒加载的,所以在调用的时候才会被执行
     */
     val runnable = new PromiseCompletingRunnable(body)
     
     // 异步计算从这个地方开始然后调用PromiseCompletingRunnable的run方法
     executor.prepare.execute(runnable) 
            
     // 返回Future,实际上是DefaultPromise实例,以便后续使用,比如说Compose
     runnable.promise.future
  }
}

而在PromiseCompletingRunnable执行的时候,会将执行结果封装在Try[T]中,所以onComplete的回调函数的参数也是Try[T]类型的。

class PromiseCompletingRunnable[T](body: => T) {
   val promise = new Promise.DefaultPromise[T]()
   override def run() = {
      promise complete {
          try {
            Success(T)
          } catch {
            case NonFatal(e) => Failure(e)
          }
      }
   }
}

1.2.1 DefaultPromise.complete

// 如果该Promise已经完成则报错,否则就返回DefaultPromise的当前实例
def complete(result: Try[T]): this.type = 
  if (tryComplete(result)) this 
  else throw IllegalStateException("Promise already completed.") 

1.2.2 DefaultPromise.tryComplete

// tryComplete返回true则说明没有完成,返回false则说明完成
def tryComplete(result: Try[T]) = {
  tryCompleteAndGetListeners(result) {
     case null => false
     case rs if rs.isEmpty => true
     case rs => rs.foreach(r => r.executeWithValue(result)); true;
  }
}

1.2.3 DefaultPromise.tryCompleteAndGetListeners

@tailrec
private def tryCompleteAndGetListeners(v: Try[T]): 
  List[CallbackRunnable[T]] = {
  getState match {
    case raw: List[_] =>
      val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
      if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v)
      
    case _: DefaultPromise[_] =>
      compressedRoot().tryCompleteAndGetListeners(v)
    case _ => null
  }
}

前面提到过,DefaultPromise的状态是由AbstractPromise_ref属性维护的。状态有以下几种:

1.3 Future注册回调 – Future.onComplete

Future的异步调用体现在当Future初始化之后,一个异步计算就已经开始;一般情况,我们会为Future注册相关的回调,要么是对正常的返回值进行处理,要么是对异常进行处理。同样以源码入手。

sumResult onComplete {
  case Success(s) => println(" Future succeed " + sumResult)
  case Failure(f) => println(" Future failed")
}

简化版的调用链如下:

dispatchOrAddCallback

1.3.1 DefaultPromise.onComplete

def onComplete[U](func: Try[T] => U)
    (implicit executor: ExecutionContext): Unit = {
    ...
    val runnable = new CallbackRunnable(executor, func) // 1.1
    dispatchOrAddCallback(runnable) // 1.2
}

1.3.2 scala.concurrent.impl.CallbackRunnable

因为Java中Runnable的实现(run方法)默认是返回空的,所以此处基于Runnable再实现了一个,主要的目的是为了注入回调,让Future完成之后执行该回调。

class CallBackRunnable(
      val executor: ExecutionContext, val onComplete: Try[T] => Any) 
     extends Runnable with OnCompleteRunnable {
     var value: Try[T] = null
     
     override def run() = {
        // 与executeWithValue中require相结合,执行run的时候value必须不为null
        require(value ne null) 
        
        try {
          // case Success(s) => println(" Future succeed " + sumResult)
          // case Failure(f) => println(" Future failed")
          onComplete(value)
        } catch {
          case NoFatal(e) => ....
        }
     }
     
    def executeWithValue(v: Try[T]) = {
        
        // 确保回调函数没有被多次执行
        require(value eq null) 
        value = v
        
        try executor.execute(this) 
        catch {
          case NonFatal(e) => ....
        }
     }

}

1.3.3 DefaultPromise.dispatchOrAddCallback

这个方法的主要目的是获取Future的状态,从而决定是执行回调还是将回调转移给根Promise。注意这个过程中Future的执行会导致Promise的状态发生变化,也就是前面提到的PromiseCompletingRunnable中的run方法。 以sumFuture为例。

private def dispatchOrAddCallback(runnable: Runnable): Unit = {
    getState match {
      case r: Try[_] => 
         runnable.executeWithValue(r.asInstanceOf[Try[T]])
      
      case _: DefaultPromise[_] => 
         compressRoot().dispatchOrAddCallback(runnable)
      
      case listeners: List[_] => 
         if (updateState(listeners, runnable :: listeners)) () 
         else dispatchOrAddCallback(runnable)
    }
  }

前面提到过,DefaultPromise的状态有三种情况,在执行回调函数的线程上(Future运行和回调执行是不同的线程)通过取Future的状态来决定如何操作

至此,一个最基本的Future初始化以及注册回调并收集结果的流程就走完了(对于,状态为DefaultPromise的情况会在后续的更新中完成)。

以下是本文的一些汇总:

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方