欢迎光临
一个学习的网站

使用CompletableFuture.supplyAsync实现异步操作

如何使用CompletableFuture.supplyAsync实现异步操作(复杂型)

实现的代码封装在function中,也有简单的说明,如下:

    public static void useCompletableFuture_complicated() {
        // 这个方法时描述一般地使用CompletableFuture实现异步操作,即复杂的使用CompletableFuture实现异步操作
        
        // 假设我们有一个Person名字List
        List<String> personNameList = new ArrayList<>();
        
        // 为了方便测试,我们要构造大量的数据add到personNameList,用for循环,名字就是1, 2, 3, ...
        
        // 这里添加1000个名字到personNameList
        for (int i = 0; i < 1000; i++) {
            personNameList.add(String.valueOf(i));
        }
        
        // 假设我们要做的业务是personNameList里的每个人都说一句Hello World, 但是我们不关心他们说这句话的顺序,而且我们希望这个业务能够较快速的完成,所以采用异步就是比较合适的
     
        // 先创建两个活动线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 开始我们的业务处理
        for (String personName : personNameList) {
            CompletableFuture.supplyAsync(new Supplier<String>() {
                @Override
                public String get() {
                    // 模拟业务逻辑,say hello world
                    System.out.println(personName + ": Hello World!");
                    return "task finished!";
                }
            }, executor);
        }
        
        // 关闭线程池executor
        // 说明一下executor必须要显示关闭(它的方法里有介绍),不然线程池会一直等待任务,会导致main方法一直运行
        // 还有就是关闭executor,不会导致之前提交的异步任务被打断或者取消。即之前提交的任务依然会执行到底,只是不会再接收新的任务
        executor.shutdown();
        
        /* 那么关闭线程池之后,我们怎么确定我们的任务是否都完成了呢,可以使用executor.isTerminated()命令
        // 可以看看isTerminated这个方法的说明,简单的说就是调用isTerminated()方法之前没有调用shutdown()方法的话,那么isTerminated()方法返回的永远是false。
        // 所以isTerminated()方法返回true的情况就是在调用isTerminated()方法之前要先调用shutdown()方法,且所有的任务都完成了。
        // 其实调用isTerminated()的目的就是我们对异步任务的结果是care, 我们需要等待异步任务的结果以便我们做下一步的动作。
        // 如果我们不关心异步任务的结果的话,完全可以不用调用isTerminated()。
        */ 
        while (!executor.isTerminated()) {
            System.out.println("no terminated");
            try {
                System.out.println("我要休眠一下");
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }

如何使用CompletableFuture.supplyAsync实现异步操作(简洁型)

简洁的代码如下:

        public static void useCompletableFuture_simple() {
        // 这个方法时描述利用1.8新特性,简单使用CompletableFuture实现异步操作
        
        // 先创建两个活动线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
                        
        List<String> nameList = new ArrayList<String>();
        
        for (int i = 0; i < 1000; i++) {
            nameList.add(String.valueOf(i));
        }

        // 使用JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
        nameList.stream().forEach(name -> CompletableFuture.supplyAsync((Supplier<String>) () -> {
            print((String) name); // 封装了业务逻辑
            return "success";
        }, executor).exceptionally(e -> {
            System.out.println(e);
            return "false";
        }));
        
        executor.shutdown();        
        
        while (!executor.isTerminated()) {
            System.out.println("no terminated");
            try {
                System.out.println("我要休眠一下");
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

两种方法输出结果一致(不要钻牛角尖)
比喻:会输出1000个: x: Hello World!
x的范围是0-999,它们之间是没有顺序,中间会有’no terminated’和’我要休眠一下’这两句话。

分析
使用JDK1.8新特性代码简洁了很多,格局看起来有上来了。

注意点
Supplier< String >(多了空格是因为无空格不显示)的使用我的无法去掉,我看有的文章不需要Supplier,猜测是JDK版本不一致导致。区别如下:

CompletableFuture.supplyAsync((Supplier<String>) () -> {

CompletableFuture.supplyAsync(() -> {

  1. 使用异步其实也是与线程有关系,所以要关注自己的线程是否及时关闭,以免造成内存泄漏。

更新1

日期:2021-8-14
以上两个例子使用的线程池都是默认的,查看源码:

 /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可知newFixedThreadPool用的是LinkedBlockingQueue,即阻塞的链表队列,再通过源码看看这个队列的默认大小。

/**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

默认大小是Integer.MAX_VALUE,2^31次方,2147483648,21亿多,可以认为是无限大。即这个任务队列是无界的。如果这个时候每个任务的执行时间是非常长,又不断的加任务进去,就会因为线程处理不完,导致内存飙升。

那如何修改呢?这个我目前也在寻找答案。有结果定会更新出来。

赞(1) 打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《使用CompletableFuture.supplyAsync实现异步操作》
文章链接:https://zixijiaoshi.com/2497.html
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。

评论 抢沙发

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

登录

找回密码

注册