java · 2025-05-31 0

ThreadPoolExecutor 执行任务的顺序

可以提交的任务数量是 maximum pool size 加上 queue size,不会被拒绝

1.核心线程运行任务

@Test
public void testQueue1() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            10,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(5)
    );

    for (int i = 1; i <= 5; i++) {
        int finalI = i;
        executor.execute(() -> {
            try {
                System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                        finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                        executor.getQueue().size(), executor.getCompletedTaskCount());
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 4, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 3, Completed Tasks: 1
3 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 2, Completed Tasks: 2
4 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 3
5 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 4
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 5

分析:
第 1 个任务:直接由核心线程执行
第 2,3,4,5 个任务:进入队列排队等待执行
待核心线程执行完毕,再从队列中取出任务执行

2.核心和非核心线程运行任务

@Test
public void testQueue2() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            10,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2)
    );

    for (int i = 1; i <= 5; i++) {
        int finalI = i;
        executor.execute(() -> {
            try {
                System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                        finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                        executor.getQueue().size(), executor.getCompletedTaskCount());
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

1 thread: pool-1-thread-1, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
5 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
4 thread: pool-1-thread-2, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 3, Active Threads: 3 Queue Size: 1, Completed Tasks: 1
3 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 0, Completed Tasks: 2
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 5

分析:
第 1 个任务:直接由核心线程执行
第 2,3 个任务:进入队列排队等待执行
第 4,5 个任务,创建非核心线程执行

3.非核心线程运行任务

@Test
public void testQueue3() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            0,
            10,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(5)
    );

    for (int i = 1; i <= 5; i++) {
        int finalI = i;
        executor.execute(() -> {
            try {
                System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                        finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                        executor.getQueue().size(), executor.getCompletedTaskCount());
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 4, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 3, Completed Tasks: 1
3 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 2, Completed Tasks: 2
4 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 3
5 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 4
thread: main, Pool Size: 0, Active Threads: 0 Queue Size: 0, Completed Tasks: 5

分析:
第 1 个任务:没有核心线程(corePoolSize = 0),也没有任何现有线程,会创建一个新的临时线程来执行该任务
第 2,3,4,5 个任务:进入队列排队等待执行
待线程执行完毕,再从队列中取出任务执行

@Test
public void testQueue4() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            0,
            10,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2)
    );

    for (int i = 1; i <= 5; i++) {
        int finalI = i;
        executor.execute(() -> {
            try {
                System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                        finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                        executor.getQueue().size(), executor.getCompletedTaskCount());
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

4 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
3 thread: pool-1-thread-2, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
1 thread: pool-1-thread-1, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
2 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 1, Completed Tasks: 1
5 thread: pool-1-thread-2, Pool Size: 3, Active Threads: 3 Queue Size: 0, Completed Tasks: 2
thread: main, Pool Size: 0, Active Threads: 0 Queue Size: 0, Completed Tasks: 5

分析:
第 1 个任务:没有核心线程(corePoolSize = 0),也没有任何现有线程,会创建一个新的临时线程来执行该任务
第 2,3 个任务:进入队列排队等待执行
第 4,5 个任务,创建非核心线程执行
待线程执行完毕,再从队列中取出任务执行

4.拒绝策略 AbortPolicy

@Test
public void testAbortPolicy() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            1,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    for (int i = 1; i <= 5; i++) {
        int finalI = i;
        try {
            executor.execute(() -> {
                try {
                    System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                            finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                            executor.getQueue().size(), executor.getCompletedTaskCount());
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

Task org.example.MyTest$$Lambda$1/943010986@7382f612 rejected from java.util.concurrent.ThreadPoolExecutor@1055e4af[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
Task org.example.MyTest$$Lambda$1/943010986@3caeaf62 rejected from java.util.concurrent.ThreadPoolExecutor@1055e4af[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
Task org.example.MyTest$$Lambda$1/943010986@e6ea0c6 rejected from java.util.concurrent.ThreadPoolExecutor@1055e4af[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 1
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 2

分析:
第 1 个任务:直接由核心线程执行
第 2 个任务:进入队列排队等待执行
第 3 个任务:线程执行拒绝策略
第 4 个任务:线程执行拒绝策略
第 5 个任务:线程执行拒绝策略
待线程执行完毕,再从队列中取出任务执行

线程池池完成 2 个任务,拒绝了 3 个任务
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

5.拒绝策略 DiscardPolicy

@Test
public void testDiscardPolicy() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            1,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardPolicy()
    );

    for (int i = 1; i <= 5; i++) {
        int finalI = i;
        try {
            executor.execute(() -> {
                try {
                    System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                            finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                            executor.getQueue().size(), executor.getCompletedTaskCount());
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 1
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 2

分析:
第 1 个任务:直接由核心线程执行
第 2 个任务:进入队列排队等待执行
第 3 个任务:线程执行拒绝策略
第 4 个任务:线程执行拒绝策略
第 5 个任务:线程执行拒绝策略
待线程执行完毕,再从队列中取出任务执行

线程池池完成 2 个任务,丢弃 3 个任务
DiscardPolicy:直接丢弃任务,不予任务处理也不抛弃异常,如果允许任务丢失,这是最好的一种方案

6.拒绝策略 CallerRunsPolicy

@Test
public void testCallerRunsPolicy() throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            1,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    for (int i = 1; i <= 5; i++) {
        System.out.printf("提交第 %s 个 %n", i);
        int finalI = i;
        try {
            executor.execute(() -> {
                try {
                    System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
                            finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
                            executor.getQueue().size(), executor.getCompletedTaskCount());
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    }

    TimeUnit.SECONDS.sleep(15);
    System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
            Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
            executor.getQueue().size(), executor.getCompletedTaskCount());
}

结果:

提交第 1 个
提交第 2 个
提交第 3 个
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
3 thread: main, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 1
提交第 4 个
提交第 5 个
5 thread: main, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 1
4 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 2
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 3

分析:
第 1 个任务:直接由核心线程执行
第 2 个任务:进入队列排队等待执行
第 3 个任务:线程执行拒绝策略,返回到 main 线程执行,此时 main 线程在执行任务,所以 main 没有继续执行 for 循环,main 方法执行完,才继续执行 for 循环
继续提交第 4,5 个任务,

线程池完成 3 个任务,main 线程完成 2 个任务
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量