一、概述
CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能
CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现
CompletableFuture 的优点是:
- 异步任务结束时,会自动回调某个对象的方法
- 异步任务出错时,会自动回调某个对象的方法
- 主线程设置好回调后,不再关心异步任务的执行
二、使用
方法:
-
thenAccept() 处理正常结果
-
exceptional() 处理异常结果
-
thenApplyAsync() 用于串行化另一个 CompletableFuture
-
anyOf()和allOf() 用于并行化多个 CompletableFuture
-
xxx() 表示该方法将继续在已有的线程中执行
-
xxxAsync() 表示将异步在线程池中执行
@Test
public void testException() throws Exception {
// 创建异步执行任务
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return 100;
});
// 如果执行成功
future.thenAccept((result) -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenAccept", threadName));
});
// 如果执行异常
future.exceptionally((e) -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: exceptionally", threadName));
return null;
});
TimeUnit.SECONDS.sleep(3);
}
/**
* 1.runAsync 后面跟的是一个无参数、无返回值的方法,即 Runnable,最终的返回值是 CompletableFuture<Void> 类型
* 2.supplyAsync 后面跟的是一个有参数、无返回值的方法,即 Supplier,最终的返回值也是 CompletableFuture<U> 类型
*/
@Test
public void testExecute() throws Exception {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: runAsync", threadName));
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "22";
});
// 调用者阻塞,等待结果返回
System.out.println(future2.get());
}
/**
* 1.thenRun 后面跟的是一个无参数、无返回值的方法,即 Runnable,最终的返回值是 CompletableFuture<Void> 类型
* 2.thenAccept 后面跟的是一个有参数、无返回值的方法,即 Consumer,最终的返回值也是 CompletableFuture<Void> 类型
* 3.thenApply 后面跟的是一个有参数、有返回值的方法,即 Function。最终的返回值是 CompletableFuture<U> 类型
*/
@Test
public void testThen() throws Exception {
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "11";
}).thenRun(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenRun", threadName));
});
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "22";
}).thenAccept((res) -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenAccept", threadName));
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "33";
}).thenApply((res) -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenApply", threadName));
return "3333";
});
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s", threadName));
TimeUnit.SECONDS.sleep(3);
}
@Test
public void testThenCompose() {
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "zhang san";
});
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return 18;
});
// CompletableFuture<CompletableFuture<Integer>> future1 = nameFuture.thenApply(name -> {
// String threadName = Thread.currentThread().getName();
// System.out.println(String.format("threadName: %s method: thenApply", threadName));
// return ageFuture;
// });
// System.out.println(future1.join().join());
// 如果希望返回值是一个展平的CompletableFuture,可以使用thenCompose
CompletableFuture<Integer> future2 = nameFuture.thenCompose(name -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenCompose", threadName));
return ageFuture;
});
System.out.println(future2.join());
}
@Test
public void testThenCombine() throws Exception {
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "zhang san";
});
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return 18;
});
CompletableFuture<String> future = nameFuture.thenCombine(ageFuture, (name, age) -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenCombine", threadName));
return String.format("name: %s, age: %s", name, age);
});
System.out.println(future.get());
}
/**
* 1.allOf 等待所有的 CompletableFuture 结束,返回值是 CompletableFuture<Void> 类型
* 2.anyOf 只要有任意一个 CompletableFuture 结束,就可 以做接下来的事情,返回值是 CompletableFuture<Object> 类型
*/
@Test
public void testOf() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "11";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
return "22";
});
CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2);
future.thenAccept((result) -> {
String threadName = Thread.currentThread().getName();
System.out.println(String.format("threadName: %s method: thenAccept result: %s", threadName, result));
});
TimeUnit.SECONDS.sleep(3);
}