futureTask可以用来异步获取执行结果,也就是说,业务代码的线程和主线程,可以同时执行,在需要获取执行结果的时候,调用其get()方法,就会阻塞主线程,直到获取到执行结果之后,再继续执行主线程的代码,具体的用法,不再详细叙述
使用
public class FutureTaskTest {
public static void main(String[] args) throws Exception{
FutureTask<Integer> futureTask = new FutureTask(() -> {
System.out.println("测试futureTask");
TimeUnit.SECONDS.sleep(3);
return 2;
});
System.out.println("模拟现在需要用到线程执行结果:");
new Thread(futureTask).start();
System.out.println("主线程可以继续执行其他代码...,.");
Integer result = futureTask.get();
System.out.println("获取到执行结果:"+result);
}
}
测试futureTask
主线程可以继续执行其他代码...,.
模拟现在需要用到线程执行结果:
获取到执行结果:2
这个demo就是模拟futureTask的执行过程,在get()方法被调用之后,如果没有获取到执行结果,最后一句代码是无法执行的,会一直阻塞,直到拿到线程的执行结果,才会继续执行主线程
源码
在futureTask源码中,有一个变量,state
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
* 这是官方的注释,并且标明了状态流转的过程
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
/**
* 在构造函数中,设置为new
*/
private static final int NEW = 0;
/**
* 线程正常执行完毕,先通过CAS将state修改为completing
* 是normal的前一个状态
*/
private static final int COMPLETING = 1;
/**
* 线程正常执行完成
*/
private static final int NORMAL = 2;
/**
* 执行线程的时候,如果抛出异常,通过cas修改为exceptional
*/
private static final int EXCEPTIONAL = 3;
/**
* 如果调用了cancel(boolean mayInterruptIfRunning)方法
* 入参的mayInterruptIfRunning为true,就通过cas将state设置为INTERRUPTING
* 如果为false,就通过cas将state修改为cancelled
*
* 已取消
* 被中断
*/
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
/**
* 在调用cancel()方法,入参为true的情况下,如果中断成功,通过cas将state从
* INTERRUPTING修改为INTERRUPTED
*/
private static final int INTERRUPTED = 6;
在调用get()方法,获取线程执行结果的时候,就和state的状态有关系
get()
调用get()方法,会阻塞,直到获取到线程执行结果,才会继续执行后面的代码
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
这段代码的意思是:在获取线程执行结果的时候,
- 如果当前state状态 > COMPLETING,有可能是normal(正常结束)、EXCEPTIONAL等状态,总之是一个终态,那就直接调用report(s),返回执行结果
- 如果不满足,表示当前线程还在执行中,需求等待,会调用awaitDone()
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 1.判断是否被中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 2.如果状态 > 2,表示已经是终态,return
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 3.如果是COMPLETING,表示马上就到终态了,让出cpu执行权限即可,有可能下次for循环,进来的时候,已经是normal状态了
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 4.如果不满足上面条件,并且q是null,就创建一个新的节点,然后将q设置到waiters的队尾
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 5.最终可以看到,会调用lockSupport.park()方法休眠
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
这是awaitDone()的代码,可以看到,最终是调用了LockSupport.park()方法进行阻塞等待,前面还有一系列的判断,我注释中是按照自己的理解去加的,可以看下
run()
run()方法,会在线程start的时候,被调用
public void run() {
/**
* 1.校验当前state是否是new状态且通过cas设置当前线程成功
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
/**
* 2.校验当前线程中的callable是否有效且state为new
*/
if (c != null && state == NEW) {
V result;
boolean ran;
try {
/**
* 2.1 执行call()方法,如果正常执行完毕,设置ran为true
*/
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
/**
* 2.2 假如执行报错,被捕获到,在setException方法中,也会去唤醒阻塞的线程
*/
setException(ex);
}
/**
* 3.如果ran为true,就调用set方法,在set方法中,有一步跟重要的操作,就是通过lockSupport.unPark()唤醒线程
*/
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
这个方法中,这篇博客需要关注的有两个方法,分别是:
setException(ex);和set(result);
protected void set(V v) {
// 需要注意的是:这里会先设置为COMPLETING,然后再设置为NORMAL状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
可以看到,这两个方法都调用了finishCompletion(),唯一的区别是,设置的state状态不同而已
private void finishCompletion() {
// assert state > COMPLETING;
/**
* 1.从waiters中获取到等待的节点
*/
for (WaitNode q; (q = waiters) != null;) {
/**
* 2.通过cas将q设置为null
*/
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
/**
* 3.唤醒线程
*/
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
会发现,这个方法的整体逻辑,和AQS是神似的,都是从排队的队列中,取出排队的节点,然后将节点对应的线程设置为null,然后再唤醒线程
总结
所以,代码看到这里,我们会发现,futureTask,是在调用get()方法的时候,如果线程没有执行完毕,就调用lockSupport.park()暂停线程,在线程正常执行完毕,或者是异常的时候,唤醒线程继续执行