juc · 2022-05-31 0

CompletableFuture 使用

一、概述

CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现

CompletableFuture 的优点是:

  • 异步任务结束时,会自动回调某个对象的方法
  • 异步任务出错时,会自动回调某个对象的方法
  • 主线程设置好回调后,不再关心异步任务的执行

二、使用

方法:

  • thenAccept() 处理正常结果

  • exceptional() 处理异常结果

  • thenApplyAsync() 用于串行化另一个 CompletableFuture

  • anyOf()和allOf() 用于并行化多个 CompletableFuture

  • xxx() 表示该方法将继续在已有的线程中执行

  • xxxAsync() 表示将异步在线程池中执行

@Test
public void testException() throws Exception {
    // 创建异步执行任务
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return 100;
    });

    // 如果执行成功
    future.thenAccept((result) -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenAccept", threadName));
    });

    // 如果执行异常
    future.exceptionally((e) -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: exceptionally", threadName));
        return null;
    });

    TimeUnit.SECONDS.sleep(3);
}

/**
 * 1.runAsync 后面跟的是一个无参数、无返回值的方法,即 Runnable,最终的返回值是 CompletableFuture<Void> 类型
 * 2.supplyAsync 后面跟的是一个有参数、无返回值的方法,即 Supplier,最终的返回值也是 CompletableFuture<U> 类型
 */
@Test
public void testExecute() throws Exception {
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: runAsync", threadName));
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "22";
    });

    // 调用者阻塞,等待结果返回
    System.out.println(future2.get());
}

/**
 * 1.thenRun 后面跟的是一个无参数、无返回值的方法,即 Runnable,最终的返回值是 CompletableFuture<Void> 类型
 * 2.thenAccept 后面跟的是一个有参数、无返回值的方法,即 Consumer,最终的返回值也是 CompletableFuture<Void> 类型
 * 3.thenApply 后面跟的是一个有参数、有返回值的方法,即 Function。最终的返回值是 CompletableFuture<U> 类型
 */
@Test
public void testThen() throws Exception {
    CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "11";
    }).thenRun(() -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenRun", threadName));
    });

    CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "22";
    }).thenAccept((res) -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenAccept", threadName));
    });

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "33";
    }).thenApply((res) -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenApply", threadName));
        return "3333";
    });

    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("threadName: %s", threadName));

    TimeUnit.SECONDS.sleep(3);
}

@Test
public void testThenCompose() {
    CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "zhang san";
    });

    CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return 18;
    });

//         CompletableFuture<CompletableFuture<Integer>> future1 = nameFuture.thenApply(name -> {
//             String threadName = Thread.currentThread().getName();
//             System.out.println(String.format("threadName: %s method: thenApply", threadName));
//             return ageFuture;
//         });
//         System.out.println(future1.join().join());

    // 如果希望返回值是一个展平的CompletableFuture,可以使用thenCompose
    CompletableFuture<Integer> future2 = nameFuture.thenCompose(name -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenCompose", threadName));
        return ageFuture;
    });
    System.out.println(future2.join());
}

@Test
public void testThenCombine() throws Exception {
    CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "zhang san";
    });

    CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return 18;
    });

    CompletableFuture<String> future = nameFuture.thenCombine(ageFuture, (name, age) -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenCombine", threadName));
        return String.format("name: %s, age: %s", name, age);
    });

    System.out.println(future.get());
}

/**
 * 1.allOf 等待所有的 CompletableFuture 结束,返回值是 CompletableFuture<Void> 类型
 * 2.anyOf 只要有任意一个 CompletableFuture 结束,就可 以做接下来的事情,返回值是 CompletableFuture<Object> 类型
 */
@Test
public void testOf() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "11";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: supplyAsync", threadName));
        return "22";
    });

    CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2);
    future.thenAccept((result) -> {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("threadName: %s method: thenAccept result: %s", threadName, result));
    });

    TimeUnit.SECONDS.sleep(3);
}