0
点赞
收藏
分享

微信扫一扫

FutureTask源码解读——阻塞获取异步计算结果(阻塞、取消、装饰器、适配器、Callable)



天青色等烟雨,而我在等你,微信公众号搜索:徐同学呀,持续更新肝货,快来关注我,和我一起学习吧~


更多JUC源码解读系列文章请持续关注​​JUC源码解读文章目录JDK8​​!


文章目录


  • ​​一、前言​​
  • ​​二、原理简述​​
  • ​​三、基本结构​​

  • ​​1、基本定义​​
  • ​​2、构造函数​​
  • ​​3、核心函数​​

  • ​​(1)get阻塞获取结果​​
  • ​​awaitDone阻塞等待​​
  • ​​(2)cancel取消任务中断工作线程​​
  • ​​(3)run执行任务代码并唤醒所有等待线程​​
  • ​​(4)runAndReset重复执行​​
  • ​​(5)无锁化​​


  • ​​四、实际应用​​
  • ​​五、总结​​


一、前言

​FutureTask​​​继承自​​Runnable​​​,所以也可以实现异步执行的效果,但是和常规的异步执行方式不同,常规异步只要求异步的过程是正确的就可以了,而​​FutureTask​​不仅可以知道异步执行的状态,还可以知道异步结果。那它是如何实现的呢?

​FutureTask​​在JUC中是一个比较重要的类:


  • 它是​​ScheduledThreadPoolExecutor​​​内部类​​ScheduledFutureTask​​​的父类,​​ScheduledThreadPoolExecutor​​​实现延迟和周期性调度功能时调用的就是​​FutureTask​​的函数。
  • 再者线程池的抽象父类​​AbstractExecutorService​​​中有一个函数​​submit()​​​,也是可以提交任务异步执行,其内部通过将任务类包装成​​FutureTask​​提交给工作线程异步执行,更是被“面试圣经”总结为是第三种实现线程的方式。这种说法是肤浅的,感知肤浅正是探索底层的开始。

//AbstractExecutorService#submit
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

二、原理简述

​FutureTask​​​作为​​Runnable​​​的子类,它就像是一个装饰器在​​Runnable​​异步执行的功能上,又增加了可以获取异步执行状态以及结果的功能:


  • 其内部维护了一个​​Callable​​​类型的成员变量,任务代码会包装成​​callable​​​,​​FutureTask​​​直接调用​​Callable.call()​​执行任务代码并返回结果。
  • 还维护了一个链表实现的栈,外部获取结果的线程在任务没有执行完前都会被压入栈并阻塞(​​awaitDone​​​),任务完成唤醒所有阻塞线程(​​finishCompletion​​)。

三、基本结构

​FutureTask​​实现了接口​​RunnableFuture​​,​​RunnableFuture​​继承了接口​​Runnable​​和​​Future​​。

FutureTask源码解读——阻塞获取异步计算结果(阻塞、取消、装饰器、适配器、Callable)_FutureTask

1、基本定义

​FutureTask​​​有7个状态:​​NEW​​​(新建)、​​COMPLETING​​​(正在完成)、​​NORMAL​​​(正常完成)、​​EXCEPTIONAL​​​(异常完成)、​​CANCELLED​​​(被取消)、​​INTERRUPTING​​​(正在中断)、​​INTERRUPTED​​(被中断)。如下是状态流转:


  • ​NEW -> COMPLETING​​ :此时get会被阻塞,并将当前线程放入阻塞栈中。
  • ​NEW -> COMPLETING -> NORMAL​​​ :此时​​outcome​​​是​​callable​​的运行结果。
  • ​NEW -> COMPLETING -> EXCEPTIONAL​​​ :此时​​outcome​​​是​​callable​​的运行异常。
  • ​NEW -> CANCELLED​​​ :调用了​​cancel(false)​​取消任务,删除并唤醒所有等待的线程。
  • ​NEW -> INTERRUPTING -> INTERRUPTED​​​ :调用了​​cancel(true)​​中断任务,删除并唤醒所有等待的线程。

​FutureTask​​的两大特点阻塞和取消:


  • 阻塞是由​​waiters​​属性实现,它是一个由链表实现的栈,先进后出。
  • 取消是由​​runner​​​属性实现,​​runner​​​以​​CAS​​​的方式记录当前运行线程,运行完成会再次设置为null,运行过程中取消,将调用​​runner.interrupt()​​中断运行线程,在线程池中取消是中断工作线程。

public class FutureTask<V> implements RunnableFuture<V> {

private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
//底层调用的任务
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
//记录当前计算线程,用于取消时,中断计算。
private volatile Thread runner;
/** Treiber stack of waiting threads */
//等待栈
private volatile WaitNode waiters;
}

2、构造函数

构造函数有两种,一种参数类型是​​Callable​​​,可直接赋值给成员变量​​callable​​​;另一种参数类型是​​Runnable​​​,并可传入一个​​result​​,不过没什么用。

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
//设置进来的runnable 会被适配成callable(RunnableAdapter)
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

​Runnable​​​会被包装成一个实现了​​Callable​​​的​​RunnableAdapter​​​赋值给​​callable​​​。适配的过程也可以看出​​result​​是怎么传进去就会怎么返回。

//Executors
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
//将Runnable适配成Callable
return new RunnableAdapter<T>(task, result);
}
//适配器、典型的适配器模式
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
//传入的result并没有做任何处理
return result;
}
}

所以官方给的函数​​AbstractExecutorService#submit(java.lang.Runnable, T)​​​,也可以传入​​Runnable​​​类型的任务和​​result​​​,不过只能判断任务是否完成或者取消任务,外部​​get​​​的​​result​​​还是传进去的​​result​​,没有太大的意义。

3、核心函数

(1)get阻塞获取结果

​get()​​​有两种,一种是会等待计算结束返回,一种是加了超时时长​​timeout​​​,get线程等待​​timeout​​​时长,如果此时还没有完成就抛出异常​​TimeoutException​​。

/**
* 会一直阻塞到计算结束,可以被打断
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//运行未完成,放入等待栈中
s = awaitDone(false, 0L);
return report(s);
}

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
//阻塞时间到,若还没有完成就抛异常
throw new TimeoutException();
return report(s);
}

private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
//如果是正常结束,返回结果
return (V)x;
//取消或者中断的就抛出取消异常
if (s >= CANCELLED)
throw new CancellationException();
//其他状态的抛异常EXCEPTIONAL
throw new ExecutionException((Throwable)x);
}
awaitDone阻塞等待

阻塞的核心代码就是​​awaitDone​​,get时若正在计算,将会被放入等待栈阻塞,直到超时时间到,或者被唤醒,或者被中断,然后返回当前的状态。


  1. 当前get线程被打断,删除等待节点,并抛出​​InterruptedException​​。
  2. 若此时​​s > COMPLETING​​​有可能完成、取消、中断,将等待节点(​​WaitNode​​)的thread设置为null,并返回当前状态。
  3. 若​​s == COMPLETING​​​说明正在完成,暂停当前get线程,让出对​​cpu​​的占用,执行其他get线程。
  4. 若当前get线程还没有入等待栈,实例化一个​​WaitNode​​​,并​​cas​​入栈。
  5. 最后阻塞,阻塞分为时间阻塞和永久阻塞,阻塞时间到,删除等待节点并返回当前状态。

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//循环阻塞,循环终止- 阻塞时间到,或者被唤醒,然后返回当前的状态
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
//完成,取消,中断
if (q != null)
q.thread = null;
//返回结果状态
return s;
}
else if (s == COMPLETING) // cannot time out yet
//正在完成,暂停当前线程,执行其他get线程
Thread.yield();
else if (q == null)
//NEW 状态,新建一个WaitNode
q = new WaitNode();
else if (!queued)
//cas入栈
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//需要时间阻塞
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//不需要等待,删除等待节点
removeWaiter(q);
return state;
}
//阻塞一段时间
LockSupport.parkNanos(this, nanos);
}
else
//timed false 会一直阻塞到 计算完成, 需要唤醒
LockSupport.park(this);
}
}

/**
* 栈,node.thread=null的节点会被删除
* @param node
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {
// restart on removeWaiter race
//循环删除node.thread = null的节点
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

(2)cancel取消任务中断工作线程

调用cancel()可能是被中断(​​mayInterruptIfRunning=true​​​)或者是主动取消(​​mayInterruptIfRunning=false​​)。


  • 中断取消,设置​​state​​​为​​INTERRUPTING​​​,并调用​​runner.interrupt()​​​中断当前运行线程,设置​​state​​​为​​INTERRUPTED​​,最后清空等待栈中所有阻塞节点并唤醒所有等待的线程。
  • 主动取消,设置​​state​​​为​​CANCELLED​​,最后清空等待栈中所有阻塞节点并唤醒所有等待的线程。

从代码可以看出只有​​state​​​为​​NEW​​才能被取消。

public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
//cass设置 state为INTERRUPTING or CANCELLED
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//状态没有修改成功返回false,取消失败
return false;
try { // in case call to interrupt throws exception
//如果是被打断的 就调用t.interrupt();中断线程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
//中断当前线程
t.interrupt();
} finally { // final state
//设置state为中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//最后 删除并唤醒所有等待的线程
finishCompletion();
}
return true;
}
//删除并唤醒所有等待的线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//循环cas WaitNode 为null,删除所有等待的线程
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//循环唤醒唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
//最后了,break
break;
q.next = null; // unlink to help gc 设置为null 有利于gc
q = next;
}
break;
}
}
//钩子函数
done();
callable = null; // to reduce footprint
}

(3)run执行任务代码并唤醒所有等待线程

​FutureTask​​​是会被传给​​ThreadPoolExecutor.Worker​​​,由线程池启动工作线程,然后调用​​FutureTask​​​的​​run()​​​函数,​​run()​​​又调用​​callable.call()​​​。​​run()​​​会将结果设置给​​outcome​​。

public void run() {
//新任务-->执行UNSAFE.compareAndSwapObject(this, runnerOffset,
// null, Thread.currentThread()))
//新任务会将当前线程设置给runner 这里的作用是乐观锁,保证下面的执行流程是只有一个线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用callable的call,并设置返回值
//如果传进来的任务是Runnable,会被转换成callable
result = c.call();
//若运行异常,ran=false,异常会被捕获处理
//所以传进来的任务的run或者call代码块最好try-catch下
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//设置异常给outcome
setException(ex);
}
if (ran)
//运行正常完成,设置结果给outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
//最后 runner=null 相当于是释放锁
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
//如果状态是被打断,让出cpu
handlePossibleCancellationInterrupt(s);
}
}

执行细节如下:


  • 新任务​​CAS​​​设置当前线程给​​runner​​​,这里的作用是乐观锁保证下面的执行流程只有一个线程,并且外部可以通过​​runner​​随时中断执行。
  • 直接调用​​callable.call()​​执行任务代码。
  • 中途出现异常,将异常设置给​​outcome​​,状态流转为异常完成,并唤醒所有等待的线程。

protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置异常
outcome = t;
//设置异常状态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//唤醒所有线程
finishCompletion();
}
}
  • 正常运行完毕将运行结果​​result​​​设置给​​outcome​​,状态流转为正常完成,并唤醒所有等待的线程。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置为完成状态
outcome = v;
//设置正常状态
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//完成操作-删除并唤醒所有等待的线程
finishCompletion();
}
}

private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//循环cas WaitNode 为null,删除所有等待的线程
//这里cas删除 其实是为了if里面的操作线程安全无锁
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//循环唤醒唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
//最后了,break
break;
q.next = null; // unlink to help gc 设置为null 有利于gc
q = next;
}
break;
}
}
//钩子函数
done();

callable = null; // to reduce footprint
}
  • 最终​​runner​​​设置为​​null​​​,相当于释放锁;如若此时状态为被打断状态​​INTERRUPTING​​​,需要让出​​cpu​​。
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
//暂停当前线程,让出cpu时间片
Thread.yield(); // wait out pending interrupt
}

(4)runAndReset重复执行

​runAndReset()​​​和​​run()​​​类似,但是没有把​​result​​​设置给​​outcome​​​,函数返回为​​boolean​​,用于是否重复执行。

protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
//如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查
//不会再往下执行ran=false
//所以传进来的任务run里需要自己try-catch
ran = true;
} catch (Throwable ex) {
//设置异常给outcome
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
//如果状态是被打断,让出cpu
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

(5)无锁化

​FutureTask​​​的几个成员变量并没有使用悲观锁​​Lock​​​或者​​synchronized​​​,而是用了​​cas​​​乐观锁,而且也用了​​volatile​​​修饰,使得​​state​​​的流转,​​runner​​​的设置,​​waiters​​的入栈出栈线程安全。

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
//这不就是乐观锁吗 无锁化-牛逼
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}

四、实际应用

​FutureTask​​可以配合线程池使用,也可以单独启动线程。

(1)配合线程池使用。

LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5,
30, TimeUnit.SECONDS, workQueue);
Future<Integer> future = poolExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int i = 100;
int j = 100;
int sum = i + j;
Thread.sleep(2000);
return sum;
}
});
long s = System.currentTimeMillis();
//获得计算结果
Integer result = future.get();
long e = System.currentTimeMillis();
System.out.println("result=" + result + ",ms=" + (e-s));
//result=200,ms=2016

(2)单独启动线程

List<FutureTask> taskList = new ArrayList<FutureTask>();
for (int i = 0; i < 3; i++) {
int j = i;
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return j + 10;
}
});
Thread thread = new Thread(futureTask);
thread.start();
taskList.add(futureTask);
}
//批量获取结果
for (FutureTask futureTask : taskList) {
System.out.println(futureTask.get());
}

需要注意:

无论是线程池submit提交任务还是批量启动多个线程使用​​FutureTask​​,切不可在一个for循环里一边异步执行一边获取结果,这样会使得整个过程因为阻塞获取结果变成单线程。

应该​批量提交任务,批量获取结果​。

五、总结


  1. ​FutureTask​​的特点异步执行、阻塞获取结果、可取消。
  2. ​FutureTask​​​运用了装饰器和适配器模式。装饰器的体现是在​​Runnable​​​异步执行的基础上增加了异步阻塞获取结果和取消的功能;适配器的体现是当传入的是任务是​​Runnable​​​类型时会被适配成一个实现了​​Callable​​​的​​RunnableAdapter​​。
  3. ​callable​​​是​​FutureTask​​​的成员变量,无法单独实现线程,配合​​FutureTask​​使用最佳。
  4. 任务正常完成​​outcome​​​的值是​​callable.run​​​的运行结果;任务异常完成​​outcome​​是异常。
  5. 成员变量​​runner​​的作用,可在外围中断取消正在执行的任务,也有保证任务执行线程安全。
  6. 成员变量​​waiters​​​,​​get​​获取结果当未完成前一个个入栈阻塞,完成时全部出栈唤醒。
  7. 因为​​run​​​和​​runAndReset​​​对异常不作为,建议任务代码自行​​try-catch​​。

PS: ​如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!



举报

相关推荐

0 条评论