Home > Archives > 【Concurrency】Java Future实现剖析

【Concurrency】Java Future实现剖析

Published on

1. 是什么

Future的出现与异步计算是密不可分的,它代表着异步计算的结果。同时提供了检测计算是否完成阻塞等待完成以及取回计算完成的结果等方法。它的核心实现可以概括为如下几点:

future-task-struture

2. 核心方法

public interface Future<V> {
    
    /*
        对于正在运行的任务进行取消
    */
    boolean cancel(boolean mayInterruptIfRunning);

    
    /**
        是否已经完成
    */
    boolean isDone();
    
    /*
        是否已经取消
    */
    // 线程如果已经运行结束了,isCancelled 和 isDone 返回的都是 true
    boolean isCancelled();
    
    /*
        阻塞等待计算结果
        如果任务被取消了,抛CancellationException 异常
        如果等待过程中被打断了,抛 InterruptedException 异常
    */
    V get() throws InterruptedException, ExecutionException;
    
    /**
        带超时的等待任务执行完成,如果超时时间外仍然没有响应,抛 TimeoutException 异常
    */
    V get(long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;

}

由于Future是一个接口,那我们以它最常用的实现类FutureTask,一般我们提交到线程池的Runnable就会封装成这个类型来看看它核心方法的大致实现。

3. FutureTask – 共享执行状态 + WaitNode

由于线程池提交任务的时候,有的期望有返回值(submit(Callable)),有的不需要有返回值(submit(Runnable))。所以需要将这两者统一起来: 设计一个兼具运行任务以及适当的时候拿回返回值的数据结构,于是有了RunnableFuture,它继承了Runnable和Future, 而FutureTask则实现了RunnableFuture。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    
    void run();

}

// ThreadPoolExecutor
public Future<?> submit(Runnable task) {
    RunnableTask<Void> futureTask = new FutureTask<>(task, null);
    execute(ftask);
    return futureTask;
}

3.1 重要属性

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    
    private Callable<V> callable;
    
    private Object outcome;

    private volatile Thread runner;

}
属性 说明
state 当前任务的状态,最开始的是NEW,执行完成是NORMAL; 由于涉及到不同的线程去读写,所以使用volatile修饰当然具体操作的时候结合CAS确保线程安全
callable 底层待运行任务的抽象,如果是Runnable会通过RunnableAdapter进行转换,最终执行是调用它的call方法
outcome 异步执行的结构(具体的值或者是异常),主要通过volatile的state来保护读写,也就是state决定什么时候可以写,什么时候读
runner 当前正在执行任务的线程,CAS设置

3.2 run() – CAS Runner & 执行任务 & 更新state

以下面这个线程池任务提交来看看FutureTask的run方法做了哪些工作?

ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {
    System.out.println("Test FutureTask");
});
public void run() {
    // 初始化的时候已经将state装改设置了成NEW另外如果尝试CAS设置当前线程会执行者失败,所以已经有线程在执行
    if (
        state != NEW ||
        !UNSAFE.UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
    ) {
        return;
    }
    
    try {
        Callable c = callable;
        boolean successful;
        
        try {
            
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                // boolean ran;
                boolean succesful;
                try {
                    // 调用执行 System.out.println("Test FutureTask");
                    result = c.call();
                    succesful = true;
                } catch (Throwable ex) {
                    result = null;
                    succesful = false;
                    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                        outcome = ex;
                        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                        finishCompletion();
                    }
                }
                
                if (successful) {
                    // 任务已经执行完成,CAS设置state为COMPLETING成功
                    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                        outcome = v;
                        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                        finishCompletion();
                   }
                }
            } finally {
            
            }
        }
        
    }

}

3.2 get(timeout, Timeunit) – 等待执行结果处理,最多多长时间

public V get(long timeout, TimeUnit unit) {
    
    int currentState = state;
    if (currentState <= COMPLETING) {
        currentState = awaitDone(true, unit.toNanos(timeout));
        if (currentState <= COMPLETING) {
            throw new TimeoutException();
        }
    }
    
    return report(currentState)
    
}

这个实现的核心就是借助Lock.support来挂起线程以及WaitNode维护当前正在等待获取结果的线程。

3.3 cancel – 取消任务执行

由于底层执行任务的是线程,所以cancel的实现实际上就是打断Thread。只有在当前FutureTask状态=NEW且CAS成功设置线程状态为INTERRUPTING(如果执行中允许打断就是设置成该状态)或CANCELLED才能进行真正打断。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (
        state == NEW  && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
    ) {
        // 进行取消操作,打断可能会抛出异常,选择 try finally 的结构
        try {
            // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null) {
                        t.interrupt();
                    }
                } finally { 
                    // final state
                    //状态设置成已打断
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 清理线程,如果有线程在等待获取状态,则依次唤醒
            finishCompletion();
        }

        return true;
    } else {
        return false;
    }
}

参考

> 面试官系统精讲Java源码及大厂真题 28 Future、ExecutorService源码解析

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方