publicclassFutureTask<V> implementsRunnableFuture<V> { /** * 任务执行的状态,初始值是NEW * 可能的状态变化如下: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ privatevolatileint state; privatestaticfinalint NEW = 0; privatestaticfinalint COMPLETING = 1; privatestaticfinalint NORMAL = 2; privatestaticfinalint EXCEPTIONAL = 3; privatestaticfinalint CANCELLED = 4; privatestaticfinalint INTERRUPTING = 5; privatestaticfinalint INTERRUPTED = 6;
/** The underlying callable; nulled out after running */ // 任务需要封装成Callable private Callable<V> callable; /** The result to return or exception to throw from get() */ // 任务执行的结果,或者是异常 private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ // 执行任务的线程 privatevolatile Thread runner; /** Treiber stack of waiting threads */ // 等待任务结果的队列 privatevolatile WaitNode waiters; ****
publicvoidrun(){ // 任务状态不是NEW或CAS设置任务的runner线程失败(不是NULL,说明已经启动过了),直接结束 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 执行任务,返回结果 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 保存任务执行过程中的异常 } // 任务正常执行完成,将结果保存到FutureTask if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get()throws InterruptedException, ExecutionException { int s = state; // 任务状态<=COMPLETING,说明任务还未执行完毕 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
1 2 3 4 5 6 7 8 9 10 11
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) thrownew NullPointerException(); int s = state; // 任务状态<=COMPLETING,说明任务还未执行完毕 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) thrownew TimeoutException(); return report(s); }
privatevoidremoveWaiter(WaitNode node){ if (node != null) { node.thread = null; // 将移除的节点的thread=null, 为移除做标示 retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; //通过 thread 判断当前 q 是否是需要移除的 q节点,因为我们刚才标示过了 if (q.thread != null) pred = q; //当不是我们要移除的节点,就往下走 elseif (pred != null) { //当p.thread==null时,到这里。下面这句话,相当于把q从队列移除。 pred.next = s; //pred.thread == null 这种情况是在多线程进行并发 removeWaiter 时产生的 //此时正好移除节点 node 和 pred, 所以loop跳到retry, 从新进行这个过程。想象一下,如果在并发的情况下,其他线程把pred的线程置为空了。那说明这个链表不应该包含pred了。所以我们需要跳到retry从新开始。 if (pred.thread == null) // check for race continue retry; } //到这步说明p.thread==null 并且 pred==null。说明node是头结点。 elseif (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
最后在get方法中调用report(s),根据状态s的不同进行返回结果或抛出异常。
1 2 3 4 5 6 7 8
private V report(int s)throws ExecutionException { Object x = outcome; //之前我们set的时候,已经设置过这个值了。所以直接用。 if (s == NORMAL) //正常执行结束,返回结果 return (V)x; if (s >= CANCELLED) //被取消或中断了,就抛异常。 thrownew CancellationException(); thrownew ExecutionException((Throwable)x); }