可以提交的任务数量是 maximum pool size 加上 queue size,不会被拒绝
1.核心线程运行任务
@Test
public void testQueue1() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
10,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5)
);
for (int i = 1; i <= 5; i++) {
int finalI = i;
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 4, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 3, Completed Tasks: 1
3 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 2, Completed Tasks: 2
4 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 3
5 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 4
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 5
分析:
第 1 个任务:直接由核心线程执行
第 2,3,4,5 个任务:进入队列排队等待执行
待核心线程执行完毕,再从队列中取出任务执行
2.核心和非核心线程运行任务
@Test
public void testQueue2() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
10,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2)
);
for (int i = 1; i <= 5; i++) {
int finalI = i;
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
1 thread: pool-1-thread-1, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
5 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
4 thread: pool-1-thread-2, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 3, Active Threads: 3 Queue Size: 1, Completed Tasks: 1
3 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 0, Completed Tasks: 2
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 5
分析:
第 1 个任务:直接由核心线程执行
第 2,3 个任务:进入队列排队等待执行
第 4,5 个任务,创建非核心线程执行
3.非核心线程运行任务
@Test
public void testQueue3() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
0,
10,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5)
);
for (int i = 1; i <= 5; i++) {
int finalI = i;
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 4, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 3, Completed Tasks: 1
3 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 2, Completed Tasks: 2
4 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 3
5 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 4
thread: main, Pool Size: 0, Active Threads: 0 Queue Size: 0, Completed Tasks: 5
分析:
第 1 个任务:没有核心线程(corePoolSize = 0),也没有任何现有线程,会创建一个新的临时线程来执行该任务
第 2,3,4,5 个任务:进入队列排队等待执行
待线程执行完毕,再从队列中取出任务执行
@Test
public void testQueue4() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
0,
10,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2)
);
for (int i = 1; i <= 5; i++) {
int finalI = i;
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
4 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
3 thread: pool-1-thread-2, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
1 thread: pool-1-thread-1, Pool Size: 3, Active Threads: 3 Queue Size: 2, Completed Tasks: 0
2 thread: pool-1-thread-3, Pool Size: 3, Active Threads: 3 Queue Size: 1, Completed Tasks: 1
5 thread: pool-1-thread-2, Pool Size: 3, Active Threads: 3 Queue Size: 0, Completed Tasks: 2
thread: main, Pool Size: 0, Active Threads: 0 Queue Size: 0, Completed Tasks: 5
分析:
第 1 个任务:没有核心线程(corePoolSize = 0),也没有任何现有线程,会创建一个新的临时线程来执行该任务
第 2,3 个任务:进入队列排队等待执行
第 4,5 个任务,创建非核心线程执行
待线程执行完毕,再从队列中取出任务执行
4.拒绝策略 AbortPolicy
@Test
public void testAbortPolicy() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 1; i <= 5; i++) {
int finalI = i;
try {
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
Task org.example.MyTest$$Lambda$1/943010986@7382f612 rejected from java.util.concurrent.ThreadPoolExecutor@1055e4af[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
Task org.example.MyTest$$Lambda$1/943010986@3caeaf62 rejected from java.util.concurrent.ThreadPoolExecutor@1055e4af[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
Task org.example.MyTest$$Lambda$1/943010986@e6ea0c6 rejected from java.util.concurrent.ThreadPoolExecutor@1055e4af[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 1
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 2
分析:
第 1 个任务:直接由核心线程执行
第 2 个任务:进入队列排队等待执行
第 3 个任务:线程执行拒绝策略
第 4 个任务:线程执行拒绝策略
第 5 个任务:线程执行拒绝策略
待线程执行完毕,再从队列中取出任务执行
线程池池完成 2 个任务,拒绝了 3 个任务
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
5.拒绝策略 DiscardPolicy
@Test
public void testDiscardPolicy() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
for (int i = 1; i <= 5; i++) {
int finalI = i;
try {
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 1
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 2
分析:
第 1 个任务:直接由核心线程执行
第 2 个任务:进入队列排队等待执行
第 3 个任务:线程执行拒绝策略
第 4 个任务:线程执行拒绝策略
第 5 个任务:线程执行拒绝策略
待线程执行完毕,再从队列中取出任务执行
线程池池完成 2 个任务,丢弃 3 个任务
DiscardPolicy:直接丢弃任务,不予任务处理也不抛弃异常,如果允许任务丢失,这是最好的一种方案
6.拒绝策略 CallerRunsPolicy
@Test
public void testCallerRunsPolicy() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 1; i <= 5; i++) {
System.out.printf("提交第 %s 个 %n", i);
int finalI = i;
try {
executor.execute(() -> {
try {
System.out.printf("%s thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
finalI, Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
TimeUnit.SECONDS.sleep(15);
System.out.printf("thread: %s, Pool Size: %s, Active Threads: %s Queue Size: %s, Completed Tasks: %s %n",
Thread.currentThread().getName(), executor.getPoolSize(), executor.getActiveCount(),
executor.getQueue().size(), executor.getCompletedTaskCount());
}
结果:
提交第 1 个
提交第 2 个
提交第 3 个
1 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
3 thread: main, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 0
2 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 1
提交第 4 个
提交第 5 个
5 thread: main, Pool Size: 1, Active Threads: 1 Queue Size: 1, Completed Tasks: 1
4 thread: pool-1-thread-1, Pool Size: 1, Active Threads: 1 Queue Size: 0, Completed Tasks: 2
thread: main, Pool Size: 1, Active Threads: 0 Queue Size: 0, Completed Tasks: 3
分析:
第 1 个任务:直接由核心线程执行
第 2 个任务:进入队列排队等待执行
第 3 个任务:线程执行拒绝策略,返回到 main 线程执行,此时 main 线程在执行任务,所以 main 没有继续执行 for 循环,main 方法执行完,才继续执行 for 循环
继续提交第 4,5 个任务,
线程池完成 3 个任务,main 线程完成 2 个任务
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量