本文源码基于JDK11
任务
无返回值的任务,由Runnable
接口来描述,通过实现run
方法来指定任务逻辑
package java.lang;
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
有返回值的任务,由Callable
接口来描述,通过实现call
方法来指定任务逻辑
package java.util.concurrent;
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Executors
工具类,通过RunnableAdapter
实现Runnable
转Callable
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}
Executors
工具类,Runnable
转Callable
工具方法
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
任务执行回执
Future
接口抽象出的概念类似任务执行回执,在任务提交到任务执行器后,任务执行器返回这个回执。通过这个回执,我们可以在未来某个时刻查看任务是否已执行完成,获取任务的执行结果,或取消任务的执行
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
任务与任务执行回执合一
RunnableFuture
接口同时继承了Runnable
接口和Future
接口,RunnableFuture
接口的实现类需要实现在任务完成后能够取出任务的执行结果
package java.util.concurrent;
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask
是RunnableFuture
接口的主要实现类,下面是FutureTask
重点部分代码的分析
package java.util.concurrent;
// 实现RunnableFuture接口
public class FutureTask<V> implements RunnableFuture<V> {
//...
}
Runnable
实现部分
// 任务执行状态
private volatile int state;
// 任务执行状态常量
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 执行任务的线程
private volatile Thread runner;
// 要执行的任务
private Callable<V> callable;
// 任务结果
private Object outcome;
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
// 任务正被其它线程执行,或已被执行过了,则退出
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 执行任务异常,处理任务结果
setException(ex);
}
if (ran)
// 正常执行任务,处理任务结果
set(result);
}
} finally {
runner = null;
// 处理中断
int s = state;
if (s >= INTERRUPTING)
// 仅确保cancel方法发出中断信号时,当前线程尚未从run方法返回
// 不负责重置线程中断信号
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
// 转换任务状态为COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 任务的执行未被取消或中断
// 任务执行结果放到outcome
outcome = v;
// 最终将任务状态转换为NORMAL,不保证其它线程即时可见
STATE.setRelease(this, NORMAL);
// 唤醒所有等待执行结果的线程
finishCompletion();
}
}
protected void setException(Throwable t) {
// 转换任务状态为COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 任务的执行未被取消或中断
// 任务执行异常放到outcome
outcome = t;
// 最终将任务状态转换为EXCEPTIONAL,不保证其它线程即时可见
STATE.setRelease(this, EXCEPTIONAL);
// 唤醒所有等待执行结果的线程
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 等待任务执行结果的线程链表不为空
if (WAITERS.weakCompareAndSet(this, q, null)) {
// CAS修改waiters为空成功,确保只有一条线程进行等待线程的唤醒工作
// 遍历等待线程链表
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
done();
callable = null;
}
Future
实现部分
// 任务执行结果等待线程链表
private volatile WaitNode waiters;
// 等待线程链表节点
static final class WaitNode {
// 等待线程
volatile Thread thread;
// 链表下一个节点
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 任务执行未完成,线程进入waiting状态
s = awaitDone(false, 0L);
// 返回任务执行结果
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
// 任务执行未完成,线程进入timed waiting状态
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
// awaitDone方法返回时任务执行仍未完成,即等待超时,抛出异常
throw new TimeoutException();
// 返回任务执行结果
return report(s);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
long startTime = 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
// 任务已执行完成,且已看见最终任务执行状态的更新
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// 任务已执行完成,未看见最终任务执行状态的更新,继续等待
Thread.yield();
else if (Thread.interrupted()) {
// 任务未执行完成
// 且线程已被中断,当前线程从等待任务执行结果链表中移除,抛出异常并重置中断状态
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
// 任务未执行完成且线程未被中断
// 且尚未为当前线程创建WaitNode对象
if (timed && nanos <= 0L)
// 当前线程等待任务执行结果是有限期的
// 且设置的超时时间小于等于0,即已超时,直接返回
return s;
// 当前线程等待任务执行结果是无限期等待的,或有限期等待且设置的超时时间大于0
// 为当前线程创建WaitNode对象
q = new WaitNode();
}
else if (!queued)
// 任务未执行完成且线程未被中断
// 且已为当前线程创建WaitNode对象
// 且该WaitNode对象尚未加入等待线程链表
// CAS尝试将WaitNode对象加入等待线程链表
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
// 任务未执行完成且线程未被中断
// 且已为当前线程创建WaitNode对象
// 且该WaitNode对象已加入等待线程链表
// 且当前线程等待任务执行结果是有限期的
final long parkNanos;
if (startTime == 0L) {
// 未设置线程等待起始时间,当前线程准备首次转换为timed waiting状态
// 设置线程等待起始时间
startTime = System.nanoTime();
if (startTime == 0L)
// 防止系统时间错误
startTime = 1L;
parkNanos = nanos;
} else {
// 已设置线程等待起始时间,当前线程已从timed waiting状态被唤醒
// 计算线程等待时间
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
// 等待时间已等于或超出预期,当前线程从等待任务执行结果链表中移除,并返回
removeWaiter(q);
return state;
}
// 等待时间未到预期,继续等待剩余时间
parkNanos = nanos - elapsed;
}
if (state < COMPLETING)
// 检查当前任务执行最新状态,若任务未执行完成,则当前线程转换为timed waiting状态
LockSupport.parkNanos(this, parkNanos);
}
else
// 任务未执行完成且线程未被中断
// 且已为当前线程创建WaitNode对象
// 且该WaitNode对象已加入等待线程链表
// 且当前线程等待任务执行结果是无限期的
// 当前线程转换为waiting状态
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
// 任务执行最终状态为NORMAL,即正常完成,返回执行结果
return (V)x;
if (s >= CANCELLED)
// 任务执行最终状态为CANCELLED,INTERRUPTING,INTERRUPTED,抛出CancellationException
throw new CancellationException();
// 任务执行最终状态为EXCEPTIONAL,再次抛出执行任务时抛出的异常
throw new ExecutionException((Throwable)x);
}
public boolean cancel(boolean mayInterruptIfRunning) {
// CAS修改任务执行状态,允许尝试中断任务执行线程则修改状态为INTERRUPTING,否则修改为CANCELLED
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// CAS修改任务执行状态失败,当前任务执行状态已不是NEW,可能任务已执行完成,或已取消,取消失败
return false;
try {
if (mayInterruptIfRunning) {
// 允许尝试中断任务执行线程
try {
Thread t = runner;
if (t != null)
// 发送中断信号
t.interrupt();
} finally {
// 最终将任务状态转换为INTERRUPTED,不保证其它线程即时可见
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
// 唤醒所有等待执行结果的线程
finishCompletion();
}
return true;
}
构造函数
FutureTask
的任务逻辑通过Callable
来表示,因此假如构造函数传入一个Runnable
,则将其转为Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
// Runnable先转为Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
任务执行器
Executor
接口是任务执行器的最基本抽象,只包含如何执行任务的逻辑(即如何使用线程去运行任务代码),且只接受Runnable
接口描述的任务
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
ExecutorService
接口继承Executor
接口,在Executor
的基础上加入了任务执行器生命周期的管理,并且可接受Callable
接口描述的任务,且支持任务的批量提交
package java.util.concurrent;
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
抽象类AbstractExecutorService
实现了ExecutorService
接口,实现了关于任务提交的方法,即submit
方法、invokeAll
方法和invokeAny
方法
public abstract class AbstractExecutorService implements ExecutorService {
//...
}
实现了submit
方法,将提交的Runnable
和Callable
任务统一转换为RunnableFuture
,然后调用execute
方法执行任务,再返回任务执行回执
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
newTaskFor
方法将提交的Runnable
和Callable
任务统一转换为RunnableFuture
,实现类为FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
ThreadPoolExecutor
是一个重要的任务执行器,通过使用线程池来执行任务,ThreadPoolExecutor
继承AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {
// ...
}
这里简单看一下执行任务的最核心方法,execute
方法的实现
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 线程池线程数小于核心线程数,增加一个线程并执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池线程数大于等于核心线程数,且线程池仍在运行中,将任务加入任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 重新检查线程池状态,若线程池不在运行中,且成功将任务从任务队列移除
// 拒绝执行任务
reject(command);
else if (workerCountOf(recheck) == 0)
// 仍需执行任务,若线程池线程数为0,则增加一个线程并执行任务
addWorker(null, false);
}
else if (!addWorker(command, false))
// 增加线程失败,拒绝任务
reject(command);
}