一 用法
Future是用来异步获取值得。线程或者Runnable的run方法没有返回值,所以在实际项目中需要有返回值的异步场景下,就需要用Future来获取异步任务执行后的返回值。如以下代码,主线程会一直等待,知道Future返回:
package com.youngthing.threads.future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* Future demo
* created at 15/03/2022
*
* @author 花书粉丝
* <a href="mailto://yujianbo@chtwm.com">yujianbo@chtwm.com</a>
* @since 1.0.0
*/
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Long> future = new FutureTask<Long>(()-> {
Thread.sleep(1000);
return System.currentTimeMillis();
});
new Thread(future).start();
final Long aLong = future.get();
System.out.println(aLong);
}
}
另外一个用法就是
package com.youngthing.threads.future;
import java.util.concurrent.*;
/**
* Future demo
* created at 15/03/2022
*
* @author 花书粉丝
* <a href="mailto://yujianbo@chtwm.com">yujianbo@chtwm.com</a>
* @since 1.0.0
*/
public class FuturePoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final ExecutorService executorService = Executors.newCachedThreadPool();
final Future<String> future = executorService.submit(() -> {
Thread.sleep(1000);
return "时间 : " + System.currentTimeMillis();
});
System.out.println(future.get());
executorService.shutdown();
}
}
但是这个要注意,执行完毕了,需要关闭executor service。否则线程会一直wait。
二 原理
现在面试必卷原理。而且JDK的Future,因为使用了一个大名鼎鼎的算法Treiber stack,所以更容易问到。但是我们只关注两点,一是状态变化,而是Treiber栈。
2.1 状态转换
FutureTask内部有个状态字段state。
因为状态不可逆,所以这是个一次性任务,在实际应用中最好不要多个线程去执行同一个FutureTask。
2.1 等待与唤醒
Treiber stack是一个无锁栈,物理结构是一个单向链表,其入栈和出栈均使用CAS实现。而CAS的实现也有所不同,JDK8使用的是Unsafe,而JDK9使用的是VarHandler。其他线程调用get时会把线程加入到这个等待栈。然后调用LockSupport.park将线程进入等待状态,如以下代码。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
任务完成后,遍历整个等待栈,调用LockSupport.unpark将线程挨个唤醒。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}