如何使用CompletableFuture.supplyAsync实现异步操作(复杂型)
实现的代码封装在function中,也有简单的说明,如下:
public static void useCompletableFuture_complicated() {
// 这个方法时描述一般地使用CompletableFuture实现异步操作,即复杂的使用CompletableFuture实现异步操作
// 假设我们有一个Person名字List
List<String> personNameList = new ArrayList<>();
// 为了方便测试,我们要构造大量的数据add到personNameList,用for循环,名字就是1, 2, 3, ...
// 这里添加1000个名字到personNameList
for (int i = 0; i < 1000; i++) {
personNameList.add(String.valueOf(i));
}
// 假设我们要做的业务是personNameList里的每个人都说一句Hello World, 但是我们不关心他们说这句话的顺序,而且我们希望这个业务能够较快速的完成,所以采用异步就是比较合适的
// 先创建两个活动线程的线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 开始我们的业务处理
for (String personName : personNameList) {
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 模拟业务逻辑,say hello world
System.out.println(personName + ": Hello World!");
return "task finished!";
}
}, executor);
}
// 关闭线程池executor
// 说明一下executor必须要显示关闭(它的方法里有介绍),不然线程池会一直等待任务,会导致main方法一直运行
// 还有就是关闭executor,不会导致之前提交的异步任务被打断或者取消。即之前提交的任务依然会执行到底,只是不会再接收新的任务
executor.shutdown();
/* 那么关闭线程池之后,我们怎么确定我们的任务是否都完成了呢,可以使用executor.isTerminated()命令
// 可以看看isTerminated这个方法的说明,简单的说就是调用isTerminated()方法之前没有调用shutdown()方法的话,那么isTerminated()方法返回的永远是false。
// 所以isTerminated()方法返回true的情况就是在调用isTerminated()方法之前要先调用shutdown()方法,且所有的任务都完成了。
// 其实调用isTerminated()的目的就是我们对异步任务的结果是care, 我们需要等待异步任务的结果以便我们做下一步的动作。
// 如果我们不关心异步任务的结果的话,完全可以不用调用isTerminated()。
*/
while (!executor.isTerminated()) {
System.out.println("no terminated");
try {
System.out.println("我要休眠一下");
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如何使用CompletableFuture.supplyAsync实现异步操作(简洁型)
简洁的代码如下:
public static void useCompletableFuture_simple() {
// 这个方法时描述利用1.8新特性,简单使用CompletableFuture实现异步操作
// 先创建两个活动线程的线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
List<String> nameList = new ArrayList<String>();
for (int i = 0; i < 1000; i++) {
nameList.add(String.valueOf(i));
}
// 使用JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
nameList.stream().forEach(name -> CompletableFuture.supplyAsync((Supplier<String>) () -> {
print((String) name); // 封装了业务逻辑
return "success";
}, executor).exceptionally(e -> {
System.out.println(e);
return "false";
}));
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("no terminated");
try {
System.out.println("我要休眠一下");
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
两种方法输出结果一致(不要钻牛角尖)
比喻:会输出1000个: x: Hello World!
x的范围是0-999,它们之间是没有顺序,中间会有’no terminated’和’我要休眠一下’这两句话。
分析
使用JDK1.8新特性代码简洁了很多,格局看起来有上来了。
注意点
Supplier< String >(多了空格是因为无空格不显示)的使用我的无法去掉,我看有的文章不需要Supplier,猜测是JDK版本不一致导致。区别如下:
CompletableFuture.supplyAsync((Supplier<String>) () -> {
CompletableFuture.supplyAsync(() -> {
- 使用异步其实也是与线程有关系,所以要关注自己的线程是否及时关闭,以免造成内存泄漏。
更新1
日期:2021-8-14
以上两个例子使用的线程池都是默认的,查看源码:
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可知newFixedThreadPool用的是LinkedBlockingQueue,即阻塞的链表队列,再通过源码看看这个队列的默认大小。
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
默认大小是Integer.MAX_VALUE,2^31次方,2147483648,21亿多,可以认为是无限大。即这个任务队列是无界的。如果这个时候每个任务的执行时间是非常长,又不断的加任务进去,就会因为线程处理不完,导致内存飙升。
那如何修改呢?这个我目前也在寻找答案。有结果定会更新出来。