线程池并行计算处理list集合



前言

一般线程的创建有三种方式,直接继承Thread、实现Runnable接口、实现Callable接口。其中最差当属直接继承Thread,调用new Thread()创建的线程缺乏管理,可以随意创建、相互竞争。而后两种可以使用Executor框架的线程池来管理线程,执行任务,可以重用存在的线程,减少对象创建、消亡的开销,提供定时执行、定期执行、单线程、并发数控制等功能。

Future表示异步计算的结果。提供了一些方法来检查计算是否完成,等待其完成以及检索计算结果。只有在计算完成后才可以使用get方法检索结果,必要时将其阻塞,直到准备就绪为止。取消是通过cancel方法执行的。提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果您出于可取消性的目的而使用Future而不提供可用的结果,则可以声明Future <?>形式的类型,并作为基础任务的结果返回null。

Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行,get方法会阻塞,直到任务返回结果

Future示例

1
2
3
4
5
6
 FutureTask task = new FutureTask<List<AccountAvatarInfo>>(() -> {
List<AccountAvatarInfo> avatarInfoList = this.exportAvatarInfo(100);
return avatarInfoList;
});

List<AccountAvatarInfo> = task.get();// 通过get()方法就可以拿到计算的list集合

分段处理list

创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
final static Integer corePoolSize = 10;
final static Integer maximumPoolSize = 20;
final static Long keepAliveTime = 1L;

static ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200), //阻塞队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

线程池通过 execute() 或 submit() 开始线程任务,我们最多可以达到的任务数是 maximumPoolSize + 阻塞队列数200。

创建Task任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static class Task implements Callable<List<User>> {


private int limit;
private int offset;
private UserService userService;

public Task(int limit, int offset, UserService userService){
this.limit = limit;
this.offset = offset;
this.userService = userService;
}

@Override
public List<UserInfo> call() throws Exception {
LambdaQueryWrapper wrapper = new LambdaQueryWrapper();
wrapper.select(User::getId, User::getUserName)
// ...查询条件
wrapper.last(String.format("LIMIT %d OFFSET %d", limit, offset))
List<User> users = userService.list();
// ...可以执行其他操作然后返回要封装list
List<UserInfo> userInfos = new ArrayList<>();
// ...
return userInfos;
}
}

线程池提交任务

1
2
3
4
5
6
7
8
@Resource
private UserService userService;

List<Future<List<UserInfo>>> futures = new ArrayList<>();
// 循环提交任务
for (int i = 0; i < 20; i++) {
futures.add(executorService.submit(new Task(100, 100 * i, userService)));
}

并行计算合并list集合

1
2
3
4
5
6
7
8
List<UserInfo> result = new ArrayList<>();
for(Future<List<UserInfo>> future : futures){
// 如果任务没有完成则忙等待,future.get()同效果,获取值没有计算成就阻塞等待
while (!future.isDone());
System.out.println(future.get());
//合并操作
result.addAll(future.get());
}

参考-多线程之分段处理List集合

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  1. © 2020-2021 Lauy    湘ICP备20003709号

请我喝杯咖啡吧~

支付宝
微信