juc · 2019-09-20 0

Java创建线程池六种方式及线程池原理

一、线程池

线程池的工作主要是控制运行的线程数量,把大于线程池线程数量的线程放到队列中,等到线程池有空闲线程,再从队列取出任务来执行

线程池优点:

  1. 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行
  3. 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

二、创建线程池三种方式

1.Excutors.newFixedThreadPool(int)

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue

// Excutors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

2.Excutors.newSingleThreadExecutor()

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行

newSingleThreadExecutor创建的线程池corePoolSize和maximumPoolSize值都设置为1,它使用的LinkedBlockingQueue

// Excutors.java
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

3.Excutors.newCachedThreadPool()

创建一个可缓存线程池,如果线程池长度超出处理需要,可灵活回收空闲线程,若无可回收,则新建线程

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,多余的空闲线程的存活时间60秒

// Excutors.java
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

4.Excutors.newScheduledThreadPool()

创建一个定时线程池,ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor

ScheduledExecutorService 使用的是 DelayedWorkQueue 延迟队列

schedule 方法,创建一个给定延迟的的任务,再延迟时间后,执行任务;
scheduleAtFixedRate 方法,创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务结束后,上次任务开始时间+给定的间隔时间;
scheduleWithFixedDelay 方法,创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是,上一次任务结束时间+给定的间隔时间。

scheduleAtFixedRate 和 scheduleWithFixedDelay 源码的区分点是:scheduleAtFixedRate 设置 ScheduledFutureTask 的 period 是正数;scheduleWithFixedDelay 设置 ScheduledFutureTask 的 period 是负数。进而 setNextRunTime 方法执行不同的逻辑。

// Excutors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor.java
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}
// ThreadPoolExecutor.java
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

如果是非周期任务,则执行 ScheduledFutureTask.super.run();如果是周期任务,则执行 ScheduledFutureTask.super.runAndReset(),并执行 reExecutePeriodic(),把任务放入队列,再执行 addWorker,则表示会一直创建 worker,直到创建到 corePoolSizeworker

通过源码可以看出,ScheduledThreadPoolExecutor$ScheduledFutureTask 的成员变量 time 保存着任务要执行的时间点

// ScheduledThreadPoolExecutor$ScheduledFutureTask.java
public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

long triggerTime(long delay) {
return now() +
    ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

栗子:

schedule 方法把任务放到延迟队列中,使用 queue.take() 获得数据,take() 方法实现是,执行循环,再循环中,得到 delay,执行 condition.awaitNanos(delay) 进行 wait

@Test
public void testSchedule() throws InterruptedException {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
    executor.schedule(() -> {
        System.out.println(Thread.currentThread().getName());
    }, 1, TimeUnit.SECONDS);

    TimeUnit.SECONDS.sleep(5);
}

@Test
public void testScheduleAtFixedRate() throws InterruptedException {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
    executor.scheduleAtFixedRate(() -> {
        System.out.println(Thread.currentThread().getName());
    }, 1, 1, TimeUnit.SECONDS);

    TimeUnit.MINUTES.sleep(5);
}

5.Excutors.newSingleThreadScheduledExecutor()

// Excutors.java
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

6.Excutors.newWorkStealingPool()

// Excutors.java
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

三、线程池使用

1.ThreadPoolExecutor

线程池都是通过ThreadPoolExecutor实现的

// ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

继承关系图:

ThreadPoolExecutor

2.线程池使用

public class PoolTest {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
//        ExecutorService service = Executors.newCachedThreadPool();
//        ExecutorService service = Executors.newSingleThreadExecutor();
        try{
            for (int i = 0 ; i < 5; i++){
                threadPool.submit(()->{
                    System.out.println(Thread.currentThread().getName());
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
    }

}

结果:

PoolTest

四、线程池7大参数

1.corePoolSize:线程池中的常驻核心线程数

在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务;当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中

2.maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1

3.keepAliveTime:多余的空闲线程的存活时间

当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止,默认情况下,只有当线程池大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize

4.unit:keepAliveTime的单位

5.workQueue:任务队列,被提交单尚未被执行的任务

6.threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可

7.handler:拒绝策略

表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时, 如何来拒绝请求执行的runnable的策略

// ThreadPoolExecutor.java
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

五、线程池底层工作原理

1.在创建线程池后,等待提交过来的任务请求

2.当调用execute()方法添加一个请求任务时,线程池会做以下判断

如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;

如果正在运行的线程数据大于或等于corePoolSize,那么这个任务放到队列;

如果这时候队列满了且正在运行的线程数量还小于maximumPoolsize,那么还是要创建非核心线程立刻执行这个任务;

如果队列满了且正在运行的线程数量大于或等于maximumPoolsize,那么线程池会启动饱和拒绝策略来执行

3.当一个线程完成任务时,它会从队列中取下一个任务来执行

4.当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉,所以线程池所有任务完成后它最终会收缩到corePoolSize的大小

execute

六、JDK内置的拒绝策略

拒绝策略,等待队列已经排满了,再也塞不下新任务了,同时,线程池中的max线程也到到了,无法继续为新任务服务,这时候我们就需要拒绝策略机制合理的处理这个问题

内置拒绝策略均实现了RejectedExecutionHandler接口

AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量

DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

DiscardPolicy:直接丢弃任务,不予任务处理也不抛弃异常,如果允许任务丢失,这是最好的一种方案

七、自定义线程池

因为newFixedThreadPool、newSingleThreadExecutor使用的是new LinkedBlockingQueue<>()无限长度队列,newCachedThreadPool将maximumPoolSize设置为Integer.MAX_VALUE,在实际生产中可能会出现问题

可以用ThreadPoolExecutor自定义线程池

public class MyThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        try {
            for(int i = 0; i < 10; i++){
                int temp = i;
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName() + "t 执行任务:" + temp);
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
    }

}

结果:

因为要处理的线程10个,自定义线程设置corePoolSize=2,maximumPoolSize=5,workQueue大小为3,最多线程池有8个线程,再来线程便会启动拒绝策略,拒绝策略DiscardOldestPolicy,所有会抛弃队列中等待最久的任务

成功执行了8个任务,丢失了2个任务

ThreadPoolExecutor

八、线程池原理

1.线程池状态和work线程数量

线程池状态和线程数量合并到ctl这一个变量中,其中 int 的前3位表示线程池状态,后 29 位表示 work 线程数量

// jdk11
// ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.提交任务

// jdk11
// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

对于 execute 方法:

  1. 先比较 workerCount 与 corePoolSize,若 workerCount < corePoolSize, 执行addWorker(command, true)。
  2. 若 workerCount >= corePoolSize, workQueue.offer(command)。队列的 offer 方法,如果插入成功返回 true, 失败返回 false。
  3. 若队列插入失败,队列满了,执行 addWorker(command, false)。
  4. 若addWorker(command, false) 执行失败,达到非核心线程数,执行拒绝策略。

对于 addWorker 方法:

  1. workerCount 达到最大线程数,直接返回 false。
  2. workerCount 没有达到最大线程数,创建 Worker 实例,firstTask 保存的是 task,thread 保存的是 work,即执行 thread 的 start 方法,会执行 Worker 的 run 方法。
  3. 取出 Worker 实例中的 thread,执行 start(),实际执行的是 Worker 的 run 方法。
  4. 执行 runWorker 方法
  5. 最后执行 processWorkerExit ,work 线程从 workers 中移除。

对于 runWorker 方法:

  1. 执行 Worker 的 firstTask 的 run() 方法,firstTask 是 FutureTask 实例,FutureTask 实例中的 callable,是用户实际提交的 task 实例。再循环执行 getTask 方法,从队列中获得 task。

对于 getTask 方法:

  1. 比较 workerCount 和 corePoolSize 大小,若 workerCount <= corePoolSize,使用 workQueue.take() 从队列取出 task,若队列为空,则阻塞;若 workerCount > corePoolSize,使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),从队列取出 task,若等待 keepAliveTime 后没有取到值,则返回 null。
// jdk11
// ThreadPoolExecutor.java
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();

                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
// Worker.java
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

public void run() {
    runWorker(this);
}
// jdk11
// DefaultThreadFactory.java
private static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

3.获得结果

任务提交给 ThreadPoolExecutor 后,ThreadPoolExecutor 执行的是 FutureTask 的 run 方法,FutureTask 实例中的 callable,是用户实际提交的 task 实例

FutureTask 的 run 方法:

  1. 执行 run 方法,进行 cas,把Thread runnernull 设置为 Thread.currentThread()
  2. 在 run 方法中,执行 FutureTask 的 Callable<V> callable 的 call 方法
  3. 进行cas 把 FutureTask 的 STATE 由 NEW 变成为 COMPLETING,然后把执行后的结果放到 Object outcome,然后把 FutureTask 的 STATE 变成为 NORMAL
  4. 执行 finishCompletion 方法,finishCompletion 从 waiters 中获得等待结果的线程,唤醒等待结果的每个线程
// jdk11
// FutureTask.java
public void run() {
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        outcome = v;
        STATE.setRelease(this, NORMAL); // final state
        finishCompletion();
    }
}

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
// jdk11
// WaitNode.java
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

FutureTask 的 get 方法:

  1. 如果 FutureTask 的 state 的值小于等于 COMPLETING,则进入 awaitDone 方法
  2. awaitDone 方法中构造 waiters,waiters 是一个链表,存放所以正在等待的线程,并把要获得结果的线程进行阻塞
// jdk11
// FutureTask.java
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // The code below is very delicate, to achieve these goals:
    // - call nanoTime exactly once for each call to park
    // - if nanos <= 0L, return promptly without allocation or nanoTime
    // - if nanos == Long.MIN_VALUE, don't underflow
    // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
    //   and we suffer a spurious wakeup, we will do no worse than
    //   to park-spin for a while
    long startTime = 0L;    // Special value 0L means not yet parked
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // We may have already promised (via isDone) that we are done
            // so never return empty-handed or throw InterruptedException
            Thread.yield();
        else if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        else if (q == null) {
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode();
        }
        else if (!queued)
            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
        else if (timed) {
            final long parkNanos;
            if (startTime == 0L) { // first time
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            } else {
                long elapsed = System.nanoTime() - startTime;
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            // nanoTime may be slow; recheck before parking
            if (state < COMPLETING)
                LockSupport.parkNanos(this, parkNanos);
        }
        else
            LockSupport.park(this);
    }
}