0
点赞
收藏
分享

微信扫一扫

《Java8 实战》笔记——4.CompletableFuture-组合式异步编程

君之言之 2022-03-22 阅读 28
java

第 11 章

completableFuture组合式异步编程

Future/Callable

Future接口可以构建异步应用,是多线程开发中常见的设计模式;

当我们需要调用一个函数方法时,如果这个函数执行很慢,那么我们就要进行等待;但有时候,我们可能并不急着要结果;因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求;对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据(或者回调);

(图)

sync模式 & async模式

区别于Runnable/run()方法,submit后立即放回一个future对象,随时可以通过future.get方法获得返回结果(如果call方法没跑完,get会阻塞当前其他线程(包括main线程),直到get到一个值),也可以使用future.get(1, TimeUnit.SECONDS)的方式,尝试在指定时间内获得值,只阻塞固定时间;并且,call方法是可以抛异常的,而run方法不可以;

Future/Callable的使用

@Test
public void func_01() throws ExecutionException, InterruptedException {//Callable/Future的使用
//通过执行器**ExecutorService**的**submit**方法;
  ExecutorService execute = Executors.newCachedThreadPool();
  long start = System.nanoTime();
  Future<String> future = execute.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
      Thread.sleep(5000);
      return "work1-done:" + System.currentTimeMillis();
    }
  });
  // System.out.println(future.get());//get会阻塞后面的操作
  try {
    System.out.println(future.get(1, TimeUnit.SECONDS));//get会阻塞后面的操作,最多阻塞1s,1s内得不到结果会放弃;
    System.out.println(future.isDone());
  } catch (TimeoutException e) {
    e.printStackTrace();
    System.out.println("1s内未等到执行结果.");
  }
  try {
    Thread.sleep(1500);
    System.out.println("work2-done" + System.currentTimeMillis());
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  System.out.println(future.get());
  System.out.println(future.isDone());
  long end = System.nanoTime();
  System.out.println("total compute time: " + (end - start) / 1000000 + " ms");//输出:5002 ms
}
@Test
public void func_02() throws ExecutionException, InterruptedException {//一个轮询的情景
//定义一个异步任务
  ExecutorService execute = Executors.newCachedThreadPool();
  Future<String> future = execute.submit(() -> {
    Thread.sleep(2000);
    return "hello world";
  });
  //轮询获取结果
  while (true) {
    if (future.isDone()) {
      System.out.println("等到了结果.");
      System.out.println(future.get());
    break;
    }
  }
}

Future/Callable的局限

Future很难直接表述多个Future 结果之间的依赖性,开发中,我们经常需要达成以下目的:
(1)将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果);
(2)等待Future集合中的所有任务都完成;
(3)仅等待Future集合中最快结束的任务完成,并返回它的结果;

completableFuture相对于Future,有点类似于Stream相对于Collection,他们都使用了Lambda+流水线操作的思想;

*小测试:

eg:给出一个情景——查询多个在现商店,根据给定的产品/服务,找出最低价格;在这个过程中,可能两个异步操作是依赖关系,例如查到价格后会紧跟一个折扣计算,即将查价格-查折扣两个连续的异步操作合并;以响应式的方法处理异步操作的完成事件,即持续更新每种商品的最佳推荐,而不是等待所有的商店都返回各自的价格;

同步/异步?

  • 同步:调用方法,等待这个方法的结果,等到后在继续执行后续的代码,也称为阻塞式调用;
  • 异步:被调用的方法会立即返回一个值,即可以继续执行后续的代码,这个返回值和被调用的方法线程"绑定",也称为非阻塞式调用;返回线程计算结果的方式可以是"回调函数",或是调用者开启个线程采用"轮询"的方式;

eg:

package cn.ActionInJ8;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 * Created by on 2019/9/3.
 */
//情景:查多个商店的某种指定商品的价格,给出最优(最低价格)的推荐;

public class Shop {

    public String getShopName() {
        return shopName;
    }

    String shopName;

    public Shop(String name) {
        shopName = name;
    }

    public double getPrice(
            String productName) {//根据名字查价格,查询每个商店都需要一定时间
        try {
            Thread.sleep(1500);//模拟花时间去查询
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Random().nextDouble() * productName.charAt(0) + productName.charAt(1);//模拟价格-根据名字String随机产生
    }

    public Future<Double> getPriceAsync_01(
            String productName) {//异步查询价格
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> futurePrice.complete(getPrice(productName))).start();//complete方法
        return futurePrice;
    }

    public Future<Double> getPriceAsync(String productName) {//使用工厂方法创建CompletableFuture,传入一个supplier
        return CompletableFuture.supplyAsync(() -> getPrice(productName));//进一步精简代码
    }

    //测试用例 硬件条件: cpu-6核心
    //商店列表-6个
    static List<Shop> shops1 = Arrays.asList(
            new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"));
    //商店列表-12个
    static List<Shop> shops2 = Arrays.asList(
            new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"),
            new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J"), new Shop("K"), new Shop("L"));
    //商店列表-24个
    static List<Shop> shops3 = Arrays.asList(
            new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"),
            new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J"), new Shop("A"), new Shop("B"),
            new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"),
            new Shop("I"), new Shop("J"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J"));

    public static List<String> findPrice_01(
            String productName) {//Stream-阻塞式
        return shops3.stream()
                .map(shop -> String.format("%s priceis% .2f", shop.getShopName(), shop.getPrice(productName))).collect(Collectors.toList());
    }

    public static List<String> findPrice_02(String productName) {//并行流-阻塞式
        return shops3.parallelStream()
                .map(shop -> String.format("%s priceis% .2f", shop.getShopName(), shop.getPrice(productName)))
                .collect(Collectors.toList());
    }

    public static List<CompletableFuture<String>>

    findPrice_03(String productName) {//Stream-非阻塞式,只得到一个CompletableFuture<String>的List,这个值还没有算出来就会返回;
        return shops3.stream().map(shop ->CompletableFuture.supplyAsync
                (() -> String.format("%s priceis% .2f", shop.getShopName(), shop.getPrice(productName)))).
                collect(Collectors.toList());
    }

    public static List<String> findPrice_04(String productName) {//上面方法的补充:Stream-非阻塞式,将每个shop映射成一个关于shop的Future(这个future存放一个String类型的结果),再将Future的值取(get/join)出来,get抛异常/join不抛异常
        List<CompletableFuture<String>> futures = shops3.stream().parallel().map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s priceis% .2f", shop.getShopName(), shop.getPrice(productName)))).collect(Collectors.toList());
        return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

    //*下面来自定义自己的CompletableFuture的执行器;有个计算公式:让CPU满载100%,等待时间占比约100,CPU核心数6,差不多线程理论值上线600,但是只有24个商店,所以没必要这么多,24个即可;
    private static final Executor myExecutor = Executors.newFixedThreadPool(Math.min(100, shops3.size()),//为了避免过多的线程搞死计算机,这里设置一个线程数上限100,设太多了可能吧CPU搞懵逼了 2333~
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);//设置t为"守护线程",即标记当前线程为守护线程后,当前线程会陪伴到最后一个"用户进程"结束时,自动结束自身(不管有没有运行完),下面会给详细说明; //注意: 你现在正创建的是一个由守护线程构成的线程池。Java程序无法终止或者退出一个正在运行中的线程,如果最后一个线程需要很长时间运行而其他用户线程早早就结束了,所以最后剩下的那个线程会由于一直等待无法发生的事件而长时间的占用CPU资源,从而引发问题。 //与此相反,如果将线程标记为守护进程,意味着程序(最后一个用户进程)退出时它也会被回收。这二者之间没有性能上的差异。
                    return t;
                }
            }
    );

    //*自定义了Executor,改进的CompletableFuture
    public static List<String> findPrice_05(String productName) {//上面方法的补充:Stream-非阻塞式,将每个shop映射成一个关于shop的Future(这个future存放一个String类型的结果),再将Future的值取(get/join)出来
        List<CompletableFuture<String>> futures = shops3.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s priceis% .2f", shop.getShopName(), shop.getPrice(productName)), myExecutor)).collect(Collectors.toList());
        return futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

    public static void main(String[] args) {
        // long start1 = System.nanoTime();
        // System.out.println(findPrice_01("VIVO-IQOO-5G"));
        // System.out.println("流-阻塞式 用时 " + (System.nanoTime() - start1) / 1000000 + " ms");//线性执行,24*一个任务的时间

        long start2 = System.nanoTime();
        System.out.println(findPrice_02("VIVO-IQOO-5G"));
        System.out.println("并行流-阻塞式 用时 " + (System.nanoTime() - start2) / 1000000 + " ms");//6倍的提升,并行!

        // long start3 = System.nanoTime();
        // System.out.println(findPrice_03("VIVO-IQOO-5G"));
        // System.out.println("流-非阻塞式1 用时 " + (System.nanoTime() - start3) / 1000000 + " ms");//用时非常短,因为还没有算出来就会返回Future列表

        long start4 = System.nanoTime();
        System.out.println(findPrice_04("VIVO-IQOO-5G"));
        System.out.println("流-非阻塞式2 用时 " + (System.nanoTime() - start4) / 1000000 + " ms");//发现结果和并行流不相上下!

        //原因:其内部原理都是使用了相同的线程池,并且核心线程数大小等于当前可用的CPU核心数量;但是——CompletableFuture的优势是这个数量可以自己配置以满足实际需要,而并行流不行!

        long start5 = System.nanoTime();
        System.out.println(findPrice_05("VIVO-IQOO-5G"));
        System.out.println("流-非阻塞式2 用时 " + (System.nanoTime() - start5) / 1000000 + " ms");//成倍的提升(4倍,多4倍的核心线程数)-改进的异步CompletableFuture!
    }

}

这里来回顾一下线程池的定义方法(最完整的构造函数)?

package cn.ActionInJ8;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by on 2019/9/3.
 */

public class ThreadPoolTest_01 {

    public static void main(String[] args) throws InterruptedException, IOException {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 0L;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(4);//(最大任务数+阻塞队列长度)- 总执行任务数=被拒绝的任务
        //**流程**:**先给核心池,核心池满了则将任务放进阻塞队列(阻塞队列满了,才会去判断当前总任务数与最大线程池数的关系,若小于,则会new线程去执行(与核心池一样,阻塞队列的则等待被执行);若大于maxsize,则拒绝执行)**
        ThreadFactory threadFactory = new NameTreadFactory();
        RejectedExecutionHandler handler = new MyIgnorePolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue, threadFactory, handler);
        executor.prestartAllCoreThreads(); // 预启动所有核心线程

        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(String.valueOf(i));
            executor.execute(task);
        }

        System.in.read(); //阻塞主线程
        //**结果解释**:8个被执行2个被拒绝,其中,创建了4个线程Thread(最大池数),先执行4个任务,5s后最后一波任务一次性执行4个;
        //**前2个任务直接进入核心池,后来的4个进入阻塞队列,再来了2个(2核心+2=4max)则new新线程去执行,最后俩因为超过max限制了,被reject**;
    }

    static class NameTreadFactory implements ThreadFactory { //定义线程创建的工厂**

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    public static class MyIgnorePolicy implements RejectedExecutionHandler {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            doLog(r, e);
        }

        private void doLog(Runnable r, ThreadPoolExecutor e) {
            //** 自定义拒绝策略,可做日志记录等**
            System.err.println(r.toString() + " rejected");
            //System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
        }
    }

    static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(this.toString() + " is running!");
                Thread.sleep(5000); //**让任务执行慢点**
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "MyTask [name=" + name + "]";
        }
    }

守护线程/用户线程?

守护线程与普通线程的唯一区别是:当JVM中所有的线程都是守护线程的时候,JVM就可以退出了;如果还有一个或以上的非守护线程(用户线程)则不会退出。(以上是针对正常退出,调用System.exit则必定会退出)

守护线程在没有用户线程可服务时自动离开,在Java中比较特殊的线程是被称为守护(Daemon)线程的低级别线程。这个线程具有最低的优先级,用于为系统中的其它对象和线程提供服务。将一个用户线程设置为守护线程的方式是在线程对象创建之前调用线程对象的setDaemon方法。典型的守护线程例子是JVM中的系统资源自动回收线程,我们所熟悉的Java垃圾回收线程就是一个典型的守护线程,当我们的程序中不再有任何运行中的Thread,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是Java虚拟机上仅剩的线程时,Java虚拟机会自动离开。

从字面上很容易将守护线程理解成是由虚拟机(virtual machine)在内部创建的,而用户线程则是自己所创建的。事实并不是这样我们可以手动设置任何线程都可以是“守护线程Daemon”或“用户线程User”。他们在几乎每个方面都是相同的(包括性能),唯一区别是判断虚拟机何时离开:

  • 用户线程:Java虚拟机在它所有非守护线程已经离开后自动离开。
  • 守护线程:守护线程则是用来服务用户线程的,如果没有其他用户线程在运行,那么就没有可服务对象,也就没有理由继续下去。

setDaemon(boolean on)方法可以方便的设置线程的Daemon模式,true为Daemon模式,false为User模式。setDaemon(boolean on)方法必须在线程启动之前调用,当线程正在运行时调用会产生异常。isDaemon方法将测试该线程是否为守护线程。值得一提的是,当你在一个守护线程中产生了其他线程,那么这些新产生的线程不用设置Daemon属性,都将是守护线程,用户线程同样。

eg:

package cn.ActionInJ8;

/**
* Created by on 2019/9/3.
*/
public class TestDaemon extends Thread {
public TestDaemon() { }

public void run() {//**定义的守护线程**
  for (int i = 0; i < 100; i++) {
    try {
      Thread.sleep(10);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    System.out.println(i);
  }
  System.out.println("finished.");
}

public static void main(String args[]) {
  TestDaemon test = new TestDaemon();
  test.**setDaemon**(true);**//false,要在运行之前设置**
  test.start();
  System.out.println("isDaemon=" + test.isDaemon());
  // try {//**长时间阻塞,导致 守护线程 虽然计算完了,但是结束不了**
  // [System.in](http://system.in/).read();
  // System.out.println(test.isAlive());
  // } catch (IOException e) {
  // // TODO Auto-generated catch block
  // e.printStackTrace();
  // }

  new Thread(()->{
    try {
      Thread.sleep(1000);//短时间阻塞,导致守护线程虽然没算完,但是跟这个线程(最后一个用户线程)一起结束,“finished.”打印不出来
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println(test.getState());
  }).start();

  }
}

*选择正确的线程池大小?

《Java并发编程实战》中给出如下公式:Number = NCpu * Ucpu * ( 1 + W/C)

  • Number : (理论)核心线程数量
  • NCpu : 处理器核数
  • UCpu : 期望cpu利用率
  • W/C : 等待时间与计算时间比

例如: 99%的时间是等待响应(线程IO时间/线程CPU时间) W/C = 99 ,cpu利用率期望 100% ,NCpu = 6,推断出 number = 600。但是为了避免过多的线程搞死计算机,实际会比理论计算值小一些。

*思考:到底是使用并行流还是异步?(因为流的使用相对容易,代码简洁)

流的使用简单;而异步提供了更多的灵活性,可以灵活调整线程池的大小,而这能帮助确保整体的计算不会因为线程都在等待I/O而发生阻塞。

(1)如果进行的是计算密集型的操作(计算结果依赖每个CPU核的计算能力),并且没有I/O,那么推荐使用Stream接口,因为实现简单、逻辑清晰,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。

(2)反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以像前文讨论的那样,依据 W/C(等待/计算)的比率设定需要使用的线程数(不依赖CPU的计算能力,因此可以把核心线程数设的高于CPU核心数)。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

*本章总结

最后归纳一下的知识点,包括几个核心方法的使用;

  • 什么是CompletableFuture?

CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future,CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

  • Future vs CompletableFuture

Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成

  • *Future的主要缺点如下

(1)不支持手动完成
这个意思指的是,我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。

(2)不支持进一步的非阻塞调用
这个指的是我们通过Future的get方法会一直阻塞到任务完成,但是我还想在获取任务之后,执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能

(3)不支持链式调用
这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。

(4)不支持多个Future合并
比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。

(5)不支持异常处理
Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

****如何创建?结合流一起使用?对Future结果的处理?**(管道操作、合并结果、异常处理);详细代码及讲解如下

package cn.ActionInJ8;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * Created by on 2019/9/4.
 */
//测试CompletableFuture的几个核心方法(没有性能测试)——创建异步、管道处理、合并结果
public class TestCompletableFuture_01 {
    static String getThreadName() {
        return "当前线程 " + Thread.currentThread().getName() + " ";
    }

    //测试-创建CompletableFuture,得到结果
    static String doSthLong() {
        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "OK";
    }

    @Test
    public void func_01() throws ExecutionException, InterruptedException {//创建的2种方法
        //(1)
        CompletableFuture future = new CompletableFuture();
        future.complete(doSthLong());
        System.out.println(future.get());
        //(2)使用工厂创建-一般放到流的处理中
        Object result = CompletableFuture.supplyAsync(TestCompletableFuture_01::doSthLong).get();//supplier
        System.out.println(getThreadName() + " 被阻塞,直到得到结果:" + result);//
    }

    //配合Stream的使用
    static int doubleX(int i) {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i * 2;
    }

    @Test
    public void func_02() {//效果和parallelStream类似,但略有不同
        List<Integer> list = Arrays.asList(8, 3, 9, 7, 7, 1, 9, 9, 5);
        List<CompletableFuture<Integer>> list2 = list.stream().map(i -> CompletableFuture.supplyAsync(() -> doubleX(i))).collect(Collectors.toList());
        System.out.println(list2);//[Not completed]
        //收集结果
        List<Integer> list3 = list2.stream().map(CompletableFuture::join).collect(Collectors.toList());//join/get都行,join不抛异常
        System.out.println(list2);//[Completed normally]
        System.out.println(list3);//输出结果
    }

    //接下来测试异步的管道操作——即对异步结果进一步处理(如映射map)

    //(1)thenApply()
    //类似于scala和spark的map算子,通过这个方法可以进行多次链式转化并返回最终的加工结果,一般用在流的.map中,即对future做映射,将future结果的值t(Future<T>)作为一个函数的参数传入,做映射;
    @Test
    public void func_03() {
        List<Integer> list = Arrays.asList(8, 3, 9, 7, 7, 1, 9, 9, 5);
        List<CompletableFuture<Integer>> list2 = list.stream()
                .map(i -> CompletableFuture.supplyAsync(() -> doubleX(i)))
                .map(future -> future.thenApply(TestCompletableFuture_01::doubleX))//对future的值做映射得到新future,这一步结果类型为Stream<CompletableFuture<Integer>>
                .collect(Collectors.toList());
        System.out.println(list2.parallelStream().map(CompletableFuture::join).collect(Collectors.toList()));//结果*2*2=*4
    }

    //(2)thenAccept()
    //这个方法与thenApply类似,可以接受Futrue的一个返回值,但是本身不在返回任何值,适合用于多个callback函数的最后一步操作使用。
    static void getTriple(int i) {
        System.out.println("answer: " + i * 3);
    }

    @Test
    public void func_04() {
        List<Integer> list = Arrays.asList(8, 3, 9, 7, 7, 1, 9, 9, 5);
        List<CompletableFuture<Void>> list2 = list.stream()//注意这里的类型,因为没有返回值,是CompletableFuture<Void>
                .map(i -> CompletableFuture.supplyAsync(() -> doubleX(i)))
                .map(future -> future.thenAccept(TestCompletableFuture_01::getTriple))//对future的值做映射得到新future,这一步结果类型为Stream<CompletableFuture<Void>>
                .collect(Collectors.toList());
        list2.parallelStream().map(CompletableFuture::join).count();//加count为了终结流,*2*3=*6
    }

    //(3)thenRun()
    //这个方法与上一个方法类似,一般也用于回调函数最后的执行,但这个方法不接受回调函数的返回值,纯粹就代表执行任务的最后一个步骤;
    //这个方法与thenApply类似,可以接受Futrue的一个返回值,但是本身不在返回任何值,适合用于多个callback函数的最后一步操作使用。
    static void Informed() {
        System.out.println("answer has benn gotten.");
    }

    @Test
    public void func_05() {
        List<Integer> list = Arrays.asList(8, 3, 9, 7, 7, 1, 9, 9, 5);
        List<CompletableFuture<Void>> list2 = list.stream()//注意这里的类型,因为没有返回值,是CompletableFuture<Void>
                .map(i -> CompletableFuture.supplyAsync(() -> doubleX(i)))
                .map(future -> future.thenRun(TestCompletableFuture_01::Informed))//对future的值做映射得到新future,这一步结果类型为Stream<CompletableFuture<Void>>
                .collect(Collectors.toList());
        list2.parallelStream().map(CompletableFuture::join).count();//加count为了终结流,*2*3=*6
    }

    //这里注意,截止到目前,上面的例子代码只会涉及两个线程,一个是主线程一个是ForkJoinPool池的线程(合并结果操作会共用一个线程)
    // 但其实上面的每一步都是支持异步运行的,即连接两个Async异步,分别用ForkJoinPool池中的不同的2个线程去分别处理,但是如果第二步操作依赖第一步,就没必要异步;
    /*
    // thenApply() variants 其api如下:
    <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    */
    //(4)异步的thenApplyAsync-thenApplyAsync
    @Test
    public void func_06() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future0 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);//防止future0执行完了,导致其他future复用当前线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(getThreadName() + " supplyAsync: 任务0");
            return null;
        });
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(getThreadName() + " supplyAsync: 任务1");
            return null;
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(getThreadName() + " supplyAsync: 任务2");
            return null;
        });

        CompletableFuture<String> future3 = future2.thenApplyAsync(value -> {//thenApplyAsync与future2在不同一个线程
            System.out.println(getThreadName() + " thenApplyAsync: 任务2的子任务1");
            return null;
        });
        CompletableFuture<Void> future4 = future2.thenAcceptAsync(value -> {//thenAcceptAsync与future2在不同一个线程
            System.out.println(getThreadName() + " thenAcceptAsync: 任务2的子任务2");
        });
        CompletableFuture<Void> future5 = future2.thenRunAsync(() -> {//thenRunAsync为什么与future2在同一个线程?因为一个6个线程,1个main5个worker,6个任务有一个恢复用线程
            System.out.println(getThreadName() + " thenRunAsync: 任务2的子任务3");
        });
        Thread.sleep(3500);
        System.out.println(getThreadName() + future3.join() + future4.get() + future5.join());
    }
    //还有一点需要注意:ForkJoinPool所有的工作线程都是守护模式的,也就是说如果主线程退出,那么整个处理任务都会结束,而不管你当前的任务是否执行完。

    //以上是对单个CompletableFuture的操作,可以合并两个CompletableFutures的执行结果

    //(1)CompletableFutures在执行两个依赖的任务合并时,会返回一个嵌套的结果列表,为了避免这种情况我们可以使用thenCompose来返回,直接获取最顶层的结果数据(有点像flatMap的感觉);
    //??既然是对2个CompletableFuture的合并,第二个依赖第一个,那和thenApplyAsync有何区别?是不是可能一种情况:第二个异步可能不仅依赖第一个future1的值,还依赖一个耗时的计算?
    @Test
    public void func_07() {
        List<Integer> list = Arrays.asList(8, 3, 9, 7, 7, 1, 9, 9, 5);
        List<CompletableFuture<Integer>> list2 = list.stream()
                .map(i -> CompletableFuture.supplyAsync(() -> doubleX(i)))
                .map(future -> future.thenCompose(valueOfFuture -> CompletableFuture.supplyAsync(() -> valueOfFuture + 10)))//对future的值做映射得到新future,再将future与一个依赖它的future"合并",这一步结果类型为Stream<CompletableFuture<Integer>>
                .collect(Collectors.toList());
        System.out.println(list2.parallelStream().map(CompletableFuture::join).collect(Collectors.toList()));//结果*2+10
    }

    //thenCombine合并两个没有依赖关系的CompletableFutures任务
    @Test
    public void func_08() {
        List<Integer> list = Arrays.asList(8, 3, 9, 7, 7, 1, 9, 9, 5);
        List<CompletableFuture<Integer>> list2 = list.stream()
                .map(i -> CompletableFuture.supplyAsync(() -> doubleX(i)))
                .map(future -> future.thenCombine(CompletableFuture.supplyAsync(() -> {
                    return 20;
                }), (value1, value2) -> value1 + value2))//对future的值做映射得到新future,再将future与一个不依赖它独立的future"合并",将他们的value合并处理返回,这一步结果类型为Stream<CompletableFuture<Integer>>
                .collect(Collectors.toList());
        System.out.println(list2.parallelStream().map(CompletableFuture::join).collect(Collectors.toList()));//结果*2+20
    }

    //合并多个任务的结果allOf与anyOf
    //上面说的是两个任务的合并,那么多个任务需要使用allOf或者anyOf方法。

    //(1)allOf适用于,你有一系列独立的future任务,你想等其所有的任务执行完后做一些事情。举个例子,比如我想下载100个网页,传统的串行,性能肯定不行,这里我们采用异步模式,同时对100个网页进行下载,当所有的任务下载完成之后,我们想判断每个网页是否包含某个关键词。
    //下面我们通过随机数来模拟上面的这个场景如下:
    //??思考:既然是合并无关的多个CompletableFuture,与thenCombine()有什么区别,仅仅是数量不一样吗?
    @Test
    public void func_09() throws ExecutionException, InterruptedException {
        //添加n个任务
        CompletableFuture<Double>[] array = new CompletableFuture[3];
        for (int i = 0; i < 3; i++) {
            array[i] = CompletableFuture.supplyAsync(new Supplier<Double>() {
                @Override
                public Double get() {
                    double r = Math.random();
                    System.out.println(r);
                    return r;
                }
            });
        }

        //获取结果的方式一
        System.out.println("方式1:");

        CompletableFuture[] cc = array;//不报错
//        CompletableFuture[] cc= CompletableFuture.allOf(array);//报错

//        System.out.println(CompletableFuture.allOf(array).join());
//        for (CompletableFuture<Double> cf : array) {
//            if (cf.get() > 0.6) {
//                System.out.println(cf.get());
//            }
//        }
//        System.out.println("方式2:");
        //获取结果的方式二,过滤大于指定数字,在收集输出
//        List<Double> rs = Stream.of(array).map(CompletableFuture::join).filter(number -> number > 0.6).collect(Collectors.toList());
//        System.out.println(rs);
    }
    //注意其中的join方法和get方法类似,仅仅在于在Future不能正常完成的时候抛出一个unchecked的exception,这可以确保它用在Stream的map方法中,直接使用get是没法在map里面运行的。

    //(2)anyOf方法,也比较简单,意思就是只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束。
    @Test
    public void func_10() throws ExecutionException, InterruptedException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    TimeUnit.SECONDS.sleep(4);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "wait 4 seconds";
            }
        });
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "wait 2 seconds";
            }
        });

        CompletableFuture<String> f3 = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    TimeUnit.SECONDS.sleep(6);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "wait 6 seconds";
            }
        });
        CompletableFuture<Object> result = CompletableFuture.anyOf(f1, f2, f3);//只选择其中一个,即最先执行完的那一个future
        System.out.println(result.get());
    }

    //异常处理是异步计算的一个重要环节,下面看看如何在CompletableFuture中使用:
    //(1)exceptionally异常处理-配合throw
    @Test
    public void func_11() throws ExecutionException, InterruptedException {
        int age = -1;//int age=10
        CompletableFuture<String> task = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                if (age < 0) {
                    throw new IllegalArgumentException("性别必须大于0");//throw new XXException()若注释掉,则exceptionally就捕获不了异常;
                }
                if (age < 18) {
                    return "未成年人";
                }
                return "成年人";
            }
        }).exceptionally(ex -> {//传入一个Function——有返回值
            return "发生 异常" + ex.getMessage();//这个return会覆盖get里面的return,若发生异常,则看不到异常值;
        });
        System.out.println(task.get());
    }

    //(2)此外还有另外一种异常捕捉方法handle,与上面一样,配合throw;区别上面的无论发生异常都会执行,示例如下:
    @Test
    public void func_12() throws ExecutionException, InterruptedException {
        int age = 10;//int age=-1;
        CompletableFuture<String> task = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                if (age < 0) {
//                    throw new IllegalArgumentException("性别必须大于0");//注释掉,handle也会执行,只不过exception捕获不到了
                }
                if (age < 18) {
                    return "未成年人";
                }
                return "成年人";
            }
        }).handle((result, ex) -> {//传入一个BiFuction,有一个返回值;
            System.out.println("执行handle");
            if (ex != null) {
                return "发生异常" + ex.getMessage();
            }
            return result;//handle必须有返回值,上面的return包在if{}里面;而且,这个return会覆盖get方法里面的return,这里"return result"可以看到异常值;
        });
        System.out.println(task.get());
    }
    /*
    JDK9 CompletableFuture 类增强的主要内容

    (1)支持对异步方法的超时调用
    orTimeout()
    completeOnTimeout()

    (2)支持延迟调用
    Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
    Executor delayedExecutor(long delay, TimeUnit unit)
     */
}

 

举报

相关推荐

0 条评论