Java任务执行框架源码分析

本文源码基于JDK11

任务

无返回值的任务,由Runnable接口来描述,通过实现run方法来指定任务逻辑

package java.lang;

@FunctionalInterface
public interface Runnable {  
    public abstract void run();  
}

有返回值的任务,由Callable接口来描述,通过实现call方法来指定任务逻辑

package java.util.concurrent;

@FunctionalInterface  
public interface Callable<V> {  
    V call() throws Exception;  
}

Executors工具类,通过RunnableAdapter实现RunnableCallable

private static final class RunnableAdapter<T> implements Callable<T> {  
    private final Runnable task;  
    private final T result;  
    RunnableAdapter(Runnable task, T result) {  
        this.task = task;  
        this.result = result;  
    }  
    public T call() {  
        task.run();  
        return result;  
    }  
    public String toString() {  
        return super.toString() + "[Wrapped task = " + task + "]";  
    }  
}

Executors工具类,RunnableCallable工具方法

public static Callable<Object> callable(Runnable task) {  
    if (task == null)  
        throw new NullPointerException();  
    return new RunnableAdapter<Object>(task, null);  
}

public static <T> Callable<T> callable(Runnable task, T result) {  
    if (task == null)  
        throw new NullPointerException();  
    return new RunnableAdapter<T>(task, result);  
}

任务执行回执

Future接口抽象出的概念类似任务执行回执,在任务提交到任务执行器后,任务执行器返回这个回执。通过这个回执,我们可以在未来某个时刻查看任务是否已执行完成,获取任务的执行结果,或取消任务的执行

package java.util.concurrent;

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

任务与任务执行回执合一

RunnableFuture接口同时继承了Runnable接口和Future接口,RunnableFuture接口的实现类需要实现在任务完成后能够取出任务的执行结果

package java.util.concurrent;

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

FutureTaskRunnableFuture接口的主要实现类,下面是FutureTask重点部分代码的分析

package java.util.concurrent;

// 实现RunnableFuture接口
public class FutureTask<V> implements RunnableFuture<V> {
    //...
}

Runnable实现部分

// 任务执行状态
private volatile int state;

// 任务执行状态常量
private static final int NEW          = 0;  
private static final int COMPLETING   = 1;  
private static final int NORMAL       = 2;  
private static final int EXCEPTIONAL  = 3;  
private static final int CANCELLED    = 4;  
private static final int INTERRUPTING = 5;  
private static final int INTERRUPTED  = 6;

// 执行任务的线程
private volatile Thread runner;
// 要执行的任务
private Callable<V> callable;
// 任务结果
private Object outcome;
public void run() {  
    if (state != NEW ||  
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))  
        // 任务正被其它线程执行,或已被执行过了,则退出
        return;  
    try {  
        Callable<V> c = callable;  
        if (c != null && state == NEW) {  
            V result;  
            boolean ran;  
            try {  
                // 执行任务
                result = c.call();  
                ran = true;  
            } catch (Throwable ex) {  
                result = null;  
                ran = false;  
                // 执行任务异常,处理任务结果
                setException(ex);  
            }  
            if (ran)  
                // 正常执行任务,处理任务结果
                set(result);  
        }  
    } finally {  
        runner = null;
        // 处理中断
        int s = state;  
        if (s >= INTERRUPTING)  
            // 仅确保cancel方法发出中断信号时,当前线程尚未从run方法返回
            // 不负责重置线程中断信号
            handlePossibleCancellationInterrupt(s);  
    }  
}
protected void set(V v) {  
    // 转换任务状态为COMPLETING
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {  
        // 任务的执行未被取消或中断
        // 任务执行结果放到outcome
        outcome = v;  
        // 最终将任务状态转换为NORMAL,不保证其它线程即时可见
        STATE.setRelease(this, NORMAL);
        // 唤醒所有等待执行结果的线程
        finishCompletion();
    }  
}
protected void setException(Throwable t) {  
    // 转换任务状态为COMPLETING
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {  
        // 任务的执行未被取消或中断
        // 任务执行异常放到outcome
        outcome = t;  
        // 最终将任务状态转换为EXCEPTIONAL,不保证其它线程即时可见
        STATE.setRelease(this, EXCEPTIONAL);
        // 唤醒所有等待执行结果的线程
        finishCompletion();  
    }  
}
private void finishCompletion() {  
    // assert state > COMPLETING;  
    for (WaitNode q; (q = waiters) != null;) {  
        // 等待任务执行结果的线程链表不为空
        if (WAITERS.weakCompareAndSet(this, q, null)) {  
            // CAS修改waiters为空成功,确保只有一条线程进行等待线程的唤醒工作
            // 遍历等待线程链表
            for (;;) {  
                Thread t = q.thread;  
                if (t != null) {  
                    q.thread = null;  
                    // 唤醒线程
                    LockSupport.unpark(t);  
                }  
                WaitNode next = q.next;  
                if (next == null)  
                    break;  
                q.next = null;
                q = next;  
            }  
            break;  
        }  
    }  
  
    done();  
  
    callable = null; 
}

Future实现部分

// 任务执行结果等待线程链表
private volatile WaitNode waiters;

// 等待线程链表节点
static final class WaitNode {  
    // 等待线程
    volatile Thread thread;  
    // 链表下一个节点
    volatile WaitNode next;  
    WaitNode() { thread = Thread.currentThread(); }  
}
public V get() throws InterruptedException, ExecutionException {  
    int s = state;  
    if (s <= COMPLETING)  
        // 任务执行未完成,线程进入waiting状态
        s = awaitDone(false, 0L);  
    // 返回任务执行结果
    return report(s);  
}
public V get(long timeout, TimeUnit unit)  
    throws InterruptedException, ExecutionException, TimeoutException {  
    if (unit == null)  
        throw new NullPointerException();  
    int s = state;  
    if (s <= COMPLETING &&  
        // 任务执行未完成,线程进入timed waiting状态
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)  
        // awaitDone方法返回时任务执行仍未完成,即等待超时,抛出异常
        throw new TimeoutException();  
    // 返回任务执行结果
    return report(s);  
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {  
    long startTime = 0L;
    WaitNode q = null;  
    boolean queued = false;  
    for (;;) {  
        int s = state;  
        if (s > COMPLETING) {  
            // 任务已执行完成,且已看见最终任务执行状态的更新
            if (q != null)  
                q.thread = null;  
            return s;  
        }  
        else if (s == COMPLETING)  
            // 任务已执行完成,未看见最终任务执行状态的更新,继续等待
            Thread.yield();  
        else if (Thread.interrupted()) {  
            // 任务未执行完成
            // 且线程已被中断,当前线程从等待任务执行结果链表中移除,抛出异常并重置中断状态
            removeWaiter(q);  
            throw new InterruptedException();  
        }  
        else if (q == null) {  
            // 任务未执行完成且线程未被中断
            // 且尚未为当前线程创建WaitNode对象
            if (timed && nanos <= 0L)  
                // 当前线程等待任务执行结果是有限期的
                // 且设置的超时时间小于等于0,即已超时,直接返回
                return s;  
            // 当前线程等待任务执行结果是无限期等待的,或有限期等待且设置的超时时间大于0
            // 为当前线程创建WaitNode对象
            q = new WaitNode();  
        }  
        else if (!queued)  
            // 任务未执行完成且线程未被中断
            // 且已为当前线程创建WaitNode对象
            // 且该WaitNode对象尚未加入等待线程链表
            // CAS尝试将WaitNode对象加入等待线程链表
            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);  
        else if (timed) {  
            // 任务未执行完成且线程未被中断
            // 且已为当前线程创建WaitNode对象
            // 且该WaitNode对象已加入等待线程链表
            // 且当前线程等待任务执行结果是有限期的
            final long parkNanos;  
            if (startTime == 0L) {
                // 未设置线程等待起始时间,当前线程准备首次转换为timed waiting状态
                // 设置线程等待起始时间
                startTime = System.nanoTime();  
                if (startTime == 0L)  
                    // 防止系统时间错误
                    startTime = 1L;  
                parkNanos = nanos;  
            } else {  
                // 已设置线程等待起始时间,当前线程已从timed waiting状态被唤醒
                // 计算线程等待时间
                long elapsed = System.nanoTime() - startTime;  
                if (elapsed >= nanos) {  
                    // 等待时间已等于或超出预期,当前线程从等待任务执行结果链表中移除,并返回
                    removeWaiter(q);  
                    return state;  
                }  
                // 等待时间未到预期,继续等待剩余时间
                parkNanos = nanos - elapsed;  
            }  
            if (state < COMPLETING)  
                // 检查当前任务执行最新状态,若任务未执行完成,则当前线程转换为timed waiting状态
                LockSupport.parkNanos(this, parkNanos);  
        }  
        else  
            // 任务未执行完成且线程未被中断
            // 且已为当前线程创建WaitNode对象
            // 且该WaitNode对象已加入等待线程链表
            // 且当前线程等待任务执行结果是无限期的
            // 当前线程转换为waiting状态
            LockSupport.park(this);  
    }  
}
private V report(int s) throws ExecutionException {  
    Object x = outcome;  
    if (s == NORMAL)  
        // 任务执行最终状态为NORMAL,即正常完成,返回执行结果
        return (V)x;  
    if (s >= CANCELLED)  
        // 任务执行最终状态为CANCELLED,INTERRUPTING,INTERRUPTED,抛出CancellationException
        throw new CancellationException();  
    // 任务执行最终状态为EXCEPTIONAL,再次抛出执行任务时抛出的异常
    throw new ExecutionException((Throwable)x);  
}
public boolean cancel(boolean mayInterruptIfRunning) {  
    // CAS修改任务执行状态,允许尝试中断任务执行线程则修改状态为INTERRUPTING,否则修改为CANCELLED
    if (!(state == NEW && STATE.compareAndSet  
          (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))  
        // CAS修改任务执行状态失败,当前任务执行状态已不是NEW,可能任务已执行完成,或已取消,取消失败
        return false;  
    try {
        if (mayInterruptIfRunning) {  
            // 允许尝试中断任务执行线程
            try {  
                Thread t = runner;  
                if (t != null)  
                    // 发送中断信号
                    t.interrupt();  
            } finally {
                // 最终将任务状态转换为INTERRUPTED,不保证其它线程即时可见
                STATE.setRelease(this, INTERRUPTED);  
            }  
        }  
    } finally {  
        // 唤醒所有等待执行结果的线程
        finishCompletion();  
    }  
    return true;  
}

构造函数

FutureTask的任务逻辑通过Callable来表示,因此假如构造函数传入一个Runnable,则将其转为Callable

public FutureTask(Callable<V> callable) {  
    if (callable == null)  
        throw new NullPointerException();  
    this.callable = callable;  
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {  
    // Runnable先转为Callable
    this.callable = Executors.callable(runnable, result);  
    this.state = NEW;
}

任务执行器

Executor接口是任务执行器的最基本抽象,只包含如何执行任务的逻辑(即如何使用线程去运行任务代码),且只接受Runnable接口描述的任务

package java.util.concurrent;

public interface Executor {
    void execute(Runnable command);  
}

ExecutorService接口继承Executor接口,在Executor的基础上加入了任务执行器生命周期的管理,并且可接受Callable接口描述的任务,且支持任务的批量提交

package java.util.concurrent;

public interface ExecutorService extends Executor {  
    void shutdown();  
    List<Runnable> shutdownNow();  
    boolean isShutdown();  
    boolean isTerminated();  
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
    <T> Future<T> submit(Callable<T> task);  
    <T> Future<T> submit(Runnable task, T result);  
    Future<?> submit(Runnable task);  
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;  
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  
        throws InterruptedException;  
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;  
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
        throws InterruptedException, ExecutionException, TimeoutException;  
}

抽象类AbstractExecutorService实现了ExecutorService接口,实现了关于任务提交的方法,即submit方法、invokeAll方法和invokeAny方法

public abstract class AbstractExecutorService implements ExecutorService {
    //...
}

实现了submit方法,将提交的RunnableCallable任务统一转换为RunnableFuture,然后调用execute方法执行任务,再返回任务执行回执

public Future<?> submit(Runnable task) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<Void> ftask = newTaskFor(task, null);  
    execute(ftask);  
    return ftask;  
}

public <T> Future<T> submit(Runnable task, T result) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<T> ftask = newTaskFor(task, result);  
    execute(ftask);  
    return ftask;  
}

public <T> Future<T> submit(Callable<T> task) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<T> ftask = newTaskFor(task);  
    execute(ftask);  
    return ftask;  
}

newTaskFor方法将提交的RunnableCallable任务统一转换为RunnableFuture,实现类为FutureTask

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {  
    return new FutureTask<T>(runnable, value);  
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {  
    return new FutureTask<T>(callable);  
}

ThreadPoolExecutor是一个重要的任务执行器,通过使用线程池来执行任务,ThreadPoolExecutor继承AbstractExecutorService

public class ThreadPoolExecutor extends AbstractExecutorService {
     // ...
}

这里简单看一下执行任务的最核心方法,execute方法的实现

public void execute(Runnable command) {  
    if (command == null)  
        throw new NullPointerException();
    int c = ctl.get();  
    if (workerCountOf(c) < corePoolSize) {  
        // 线程池线程数小于核心线程数,增加一个线程并执行任务
        if (addWorker(command, true))  
            return;  
        c = ctl.get();  
    }  
    // 线程池线程数大于等于核心线程数,且线程池仍在运行中,将任务加入任务队列
    if (isRunning(c) && workQueue.offer(command)) {  
        int recheck = ctl.get();  
        if (! isRunning(recheck) && remove(command))  
            // 重新检查线程池状态,若线程池不在运行中,且成功将任务从任务队列移除
            // 拒绝执行任务
            reject(command);  
        else if (workerCountOf(recheck) == 0)  
            // 仍需执行任务,若线程池线程数为0,则增加一个线程并执行任务
            addWorker(null, false);  
    }  
    else if (!addWorker(command, false))  
        // 增加线程失败,拒绝任务
        reject(command);  
}