0
点赞
收藏
分享

微信扫一扫

futureTask

冬冬_79d4 2022-02-24 阅读 51

futureTask可以用来异步获取执行结果,也就是说,业务代码的线程和主线程,可以同时执行,在需要获取执行结果的时候,调用其get()方法,就会阻塞主线程,直到获取到执行结果之后,再继续执行主线程的代码,具体的用法,不再详细叙述

使用

public class FutureTaskTest {


    public static void main(String[] args) throws Exception{
        FutureTask<Integer> futureTask = new FutureTask(() -> {
            System.out.println("测试futureTask");
            TimeUnit.SECONDS.sleep(3);
            return 2;
        });
		System.out.println("模拟现在需要用到线程执行结果:");
        new Thread(futureTask).start();
        System.out.println("主线程可以继续执行其他代码...,.");

        Integer result = futureTask.get();
        System.out.println("获取到执行结果:"+result);
    }
}

测试futureTask
主线程可以继续执行其他代码...,.
模拟现在需要用到线程执行结果:
获取到执行结果:2

这个demo就是模拟futureTask的执行过程,在get()方法被调用之后,如果没有获取到执行结果,最后一句代码是无法执行的,会一直阻塞,直到拿到线程的执行结果,才会继续执行主线程

源码

在futureTask源码中,有一个变量,state

/**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     * 这是官方的注释,并且标明了状态流转的过程
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    /**
     * 在构造函数中,设置为new
     */
    private static final int NEW          = 0;
    /**
     * 线程正常执行完毕,先通过CAS将state修改为completing
     * 是normal的前一个状态
     */
    private static final int COMPLETING   = 1;
    /**
     * 线程正常执行完成
     */
    private static final int NORMAL       = 2;
    /**
     * 执行线程的时候,如果抛出异常,通过cas修改为exceptional
     */
    private static final int EXCEPTIONAL  = 3;
    /**
     * 如果调用了cancel(boolean mayInterruptIfRunning)方法
     * 入参的mayInterruptIfRunning为true,就通过cas将state设置为INTERRUPTING
     * 如果为false,就通过cas将state修改为cancelled
     *
     * 已取消
     * 被中断
     */
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    /**
     * 在调用cancel()方法,入参为true的情况下,如果中断成功,通过cas将state从
     * INTERRUPTING修改为INTERRUPTED
     */
    private static final int INTERRUPTED  = 6;

在调用get()方法,获取线程执行结果的时候,就和state的状态有关系

get()

调用get()方法,会阻塞,直到获取到线程执行结果,才会继续执行后面的代码

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

这段代码的意思是:在获取线程执行结果的时候,

  1. 如果当前state状态 > COMPLETING,有可能是normal(正常结束)、EXCEPTIONAL等状态,总之是一个终态,那就直接调用report(s),返回执行结果
  2. 如果不满足,表示当前线程还在执行中,需求等待,会调用awaitDone()
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 1.判断是否被中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 2.如果状态 > 2,表示已经是终态,return
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 3.如果是COMPLETING,表示马上就到终态了,让出cpu执行权限即可,有可能下次for循环,进来的时候,已经是normal状态了
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 4.如果不满足上面条件,并且q是null,就创建一个新的节点,然后将q设置到waiters的队尾
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 5.最终可以看到,会调用lockSupport.park()方法休眠
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

这是awaitDone()的代码,可以看到,最终是调用了LockSupport.park()方法进行阻塞等待,前面还有一系列的判断,我注释中是按照自己的理解去加的,可以看下

run()

run()方法,会在线程start的时候,被调用

public void run() {
    /**
     * 1.校验当前state是否是new状态且通过cas设置当前线程成功
     */
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        /**
         * 2.校验当前线程中的callable是否有效且state为new
         */
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                /**
                 * 2.1 执行call()方法,如果正常执行完毕,设置ran为true
                  */
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                /**
                 * 2.2 假如执行报错,被捕获到,在setException方法中,也会去唤醒阻塞的线程
                 */
                setException(ex);
            }
            /**
             * 3.如果ran为true,就调用set方法,在set方法中,有一步跟重要的操作,就是通过lockSupport.unPark()唤醒线程
             */
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

这个方法中,这篇博客需要关注的有两个方法,分别是:
setException(ex);和set(result);

protected void set(V v) {
    // 需要注意的是:这里会先设置为COMPLETING,然后再设置为NORMAL状态
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

可以看到,这两个方法都调用了finishCompletion(),唯一的区别是,设置的state状态不同而已

private void finishCompletion() {
    // assert state > COMPLETING;
    /**
     * 1.从waiters中获取到等待的节点
     */
    for (WaitNode q; (q = waiters) != null;) {
        /**
         * 2.通过cas将q设置为null
         */
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                /**
                 * 3.唤醒线程
                 */
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

会发现,这个方法的整体逻辑,和AQS是神似的,都是从排队的队列中,取出排队的节点,然后将节点对应的线程设置为null,然后再唤醒线程

总结

所以,代码看到这里,我们会发现,futureTask,是在调用get()方法的时候,如果线程没有执行完毕,就调用lockSupport.park()暂停线程,在线程正常执行完毕,或者是异常的时候,唤醒线程继续执行

举报

相关推荐

0 条评论