一、线程池
线程池的工作主要是控制运行的线程数量,把大于线程池线程数量的线程放到队列中,等到线程池有空闲线程,再从队列取出任务来执行
线程池优点:
- 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行
- 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
二、创建线程池三种方式
1.Excutors.newFixedThreadPool(int)
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue
// Excutors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.Excutors.newSingleThreadExecutor()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
newSingleThreadExecutor创建的线程池corePoolSize和maximumPoolSize值都设置为1,它使用的LinkedBlockingQueue
// Excutors.java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3.Excutors.newCachedThreadPool()
创建一个可缓存线程池,如果线程池长度超出处理需要,可灵活回收空闲线程,若无可回收,则新建线程
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,多余的空闲线程的存活时间60秒
// Excutors.java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4.Excutors.newScheduledThreadPool()
创建一个定时线程池,ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor
ScheduledExecutorService 使用的是 DelayedWorkQueue 延迟队列
schedule 方法,创建一个给定延迟的的任务,再延迟时间后,执行任务;
scheduleAtFixedRate 方法,创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务结束后,上次任务开始时间+给定的间隔时间;
scheduleWithFixedDelay 方法,创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是,上一次任务结束时间+给定的间隔时间。
scheduleAtFixedRate 和 scheduleWithFixedDelay 源码的区分点是:scheduleAtFixedRate 设置 ScheduledFutureTask 的 period 是正数;scheduleWithFixedDelay 设置 ScheduledFutureTask 的 period 是负数。进而 setNextRunTime 方法执行不同的逻辑。
// Excutors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor.java
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
// ThreadPoolExecutor.java
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
如果是非周期任务,则执行 ScheduledFutureTask.super.run()
;如果是周期任务,则执行 ScheduledFutureTask.super.runAndReset()
,并执行 reExecutePeriodic()
,把任务放入队列,再执行 addWorker
,则表示会一直创建 worker
,直到创建到 corePoolSize
个 worker
通过源码可以看出,ScheduledThreadPoolExecutor$ScheduledFutureTask
的成员变量 time
保存着任务要执行的时间点
// ScheduledThreadPoolExecutor$ScheduledFutureTask.java
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
栗子:
schedule 方法把任务放到延迟队列中,使用 queue.take() 获得数据,take() 方法实现是,执行循环,再循环中,得到 delay,执行 condition.awaitNanos(delay) 进行 wait
@Test
public void testSchedule() throws InterruptedException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
System.out.println(Thread.currentThread().getName());
}, 1, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(5);
}
@Test
public void testScheduleAtFixedRate() throws InterruptedException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName());
}, 1, 1, TimeUnit.SECONDS);
TimeUnit.MINUTES.sleep(5);
}
5.Excutors.newSingleThreadScheduledExecutor()
// Excutors.java
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
6.Excutors.newWorkStealingPool()
// Excutors.java
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
三、线程池使用
1.ThreadPoolExecutor
线程池都是通过ThreadPoolExecutor实现的
// ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
继承关系图:
2.线程池使用
public class PoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// ExecutorService service = Executors.newCachedThreadPool();
// ExecutorService service = Executors.newSingleThreadExecutor();
try{
for (int i = 0 ; i < 5; i++){
threadPool.submit(()->{
System.out.println(Thread.currentThread().getName());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
结果:
四、线程池7大参数
1.corePoolSize:线程池中的常驻核心线程数
在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务;当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
2.maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1
3.keepAliveTime:多余的空闲线程的存活时间
当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止,默认情况下,只有当线程池大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize
4.unit:keepAliveTime的单位
5.workQueue:任务队列,被提交单尚未被执行的任务
6.threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可
7.handler:拒绝策略
表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时, 如何来拒绝请求执行的runnable的策略
// ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
五、线程池底层工作原理
1.在创建线程池后,等待提交过来的任务请求
2.当调用execute()方法添加一个请求任务时,线程池会做以下判断
如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
如果正在运行的线程数据大于或等于corePoolSize,那么这个任务放到队列;
如果这时候队列满了且正在运行的线程数量还小于maximumPoolsize,那么还是要创建非核心线程立刻执行这个任务;
如果队列满了且正在运行的线程数量大于或等于maximumPoolsize,那么线程池会启动饱和拒绝策略来执行
3.当一个线程完成任务时,它会从队列中取下一个任务来执行
4.当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉,所以线程池所有任务完成后它最终会收缩到corePoolSize的大小
六、JDK内置的拒绝策略
拒绝策略,等待队列已经排满了,再也塞不下新任务了,同时,线程池中的max线程也到到了,无法继续为新任务服务,这时候我们就需要拒绝策略机制合理的处理这个问题
内置拒绝策略均实现了RejectedExecutionHandler接口
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
DiscardPolicy:直接丢弃任务,不予任务处理也不抛弃异常,如果允许任务丢失,这是最好的一种方案
七、自定义线程池
因为newFixedThreadPool、newSingleThreadExecutor使用的是new LinkedBlockingQueue<>()无限长度队列,newCachedThreadPool将maximumPoolSize设置为Integer.MAX_VALUE,在实际生产中可能会出现问题
可以用ThreadPoolExecutor自定义线程池
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
for(int i = 0; i < 10; i++){
int temp = i;
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "t 执行任务:" + temp);
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
结果:
因为要处理的线程10个,自定义线程设置corePoolSize=2,maximumPoolSize=5,workQueue大小为3,最多线程池有8个线程,再来线程便会启动拒绝策略,拒绝策略DiscardOldestPolicy,所有会抛弃队列中等待最久的任务
成功执行了8个任务,丢失了2个任务
八、线程池原理
1.线程池状态和work线程数量
线程池状态和线程数量合并到ctl这一个变量中,其中 int 的前3位表示线程池状态,后 29 位表示 work 线程数量
// jdk11
// ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.提交任务
// jdk11
// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
对于 execute 方法:
- 先比较 workerCount 与 corePoolSize,若 workerCount < corePoolSize, 执行addWorker(command, true)。
- 若 workerCount >= corePoolSize, workQueue.offer(command)。队列的 offer 方法,如果插入成功返回 true, 失败返回 false。
- 若队列插入失败,队列满了,执行 addWorker(command, false)。
- 若addWorker(command, false) 执行失败,达到非核心线程数,执行拒绝策略。
对于 addWorker 方法:
- workerCount 达到最大线程数,直接返回 false。
- workerCount 没有达到最大线程数,创建 Worker 实例,firstTask 保存的是 task,thread 保存的是 work,即执行 thread 的 start 方法,会执行 Worker 的 run 方法。
- 取出 Worker 实例中的 thread,执行 start(),实际执行的是 Worker 的 run 方法。
- 执行 runWorker 方法
- 最后执行 processWorkerExit ,work 线程从 workers 中移除。
对于 runWorker 方法:
- 执行 Worker 的 firstTask 的 run() 方法,firstTask 是 FutureTask 实例,FutureTask 实例中的 callable,是用户实际提交的 task 实例。再循环执行 getTask 方法,从队列中获得 task。
对于 getTask 方法:
- 比较 workerCount 和 corePoolSize 大小,若 workerCount <= corePoolSize,使用 workQueue.take() 从队列取出 task,若队列为空,则阻塞;若 workerCount > corePoolSize,使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),从队列取出 task,若等待 keepAliveTime 后没有取到值,则返回 null。
// jdk11
// ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
// Worker.java
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// jdk11
// DefaultThreadFactory.java
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
3.获得结果
任务提交给 ThreadPoolExecutor 后,ThreadPoolExecutor 执行的是 FutureTask 的 run 方法,FutureTask 实例中的 callable,是用户实际提交的 task 实例
FutureTask 的 run 方法:
- 执行 run 方法,进行 cas,把
Thread runner
由null
设置为Thread.currentThread()
- 在 run 方法中,执行 FutureTask 的
Callable<V> callable
的 call 方法 - 进行cas 把 FutureTask 的 STATE 由 NEW 变成为 COMPLETING,然后把执行后的结果放到 Object outcome,然后把 FutureTask 的 STATE 变成为 NORMAL
- 执行 finishCompletion 方法,finishCompletion 从 waiters 中获得等待结果的线程,唤醒等待结果的每个线程
// jdk11
// FutureTask.java
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
// jdk11
// WaitNode.java
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
FutureTask 的 get 方法:
- 如果 FutureTask 的 state 的值小于等于 COMPLETING,则进入 awaitDone 方法
- awaitDone 方法中构造 waiters,waiters 是一个链表,存放所以正在等待的线程,并把要获得结果的线程进行阻塞
// jdk11
// FutureTask.java
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}