List拆分以后使用线程池并发处理其中数据

业务中需要将mq已经消费成功的数据但是调用的下游系统错误的记录重新请求一次并更新到记录中,提供了一个通过开始和结束时间处理之前失败记录的接口,但是单线程时数据量大处理的很慢,所以采用线程池启动多线程进行重新调用并更新业务结果。

定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.rocketmq.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* @Author:fanxin
* @Package:com.rocketmq.consumer.config
* @Project:bright-rocketmq-eshine
* @name:ThreadPoolConfig
* @Date:2023/12/26 9:28
*/
@Configuration
@Slf4j
public class ThreadPoolConfig {

//获取cpu线程数+1
private static final int threadNum = Runtime.getRuntime().availableProcessors() + 1;

// 核心线程池大小
public static int corePoolSize = threadNum;

//最大线程数
private int maxPoolSize = 2 * threadNum;

//线程池维护线程所允许的空闲时间 1分钟
private int keepAliveSeconds = 60;

//队列最大长度
private int queueCapacity = 1024;

//线程池名前缀
private static final String threadNamePrefixName = "Async-Retry-Service-";

/**
* 自定义线程池
* @return
*/
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor createThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//核心线程数
executor.setCorePoolSize(threadNum);
//线程活跃时间(秒)
executor.setKeepAliveSeconds(keepAliveSeconds);
//默认线程名称
executor.setThreadNamePrefix(threadNamePrefixName);
//阻塞队列容量
executor.setQueueCapacity(queueCapacity);
//设置拒绝策略:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//初始化
executor.initialize();
log.info("初始化线程池(threadPoolTaskExecutor):核心线程池大小:" + threadNum);
return executor;
}

}

service中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public Result retryRecord(Date startDate,Date endDate,String apiName) {
JSONObject count = new JSONObject();
Date startDate = ConcurrentDateUtil.parse(startDateStr);
Date endDate = ConcurrentDateUtil.parse(endDateStr);
//从mongodb中根据时间查询需要重试的list
List<ConsumerCrmRecord> crmRecords = consumerCrmRecordRepository.findByCreateTimeBetweenAndApiNameAndStatus(startDate, endDate, apiName,0);
//记录总条数
count.put("total",crmRecords.size());
//定义一个成功记录数
int successCount = 0;
//拆分list
List<List<ConsumerCrmRecord>> partition = Lists.partition(crmRecords, 500);
int batchNum = partition.size();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//获取线程池
ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolConfig.createThreadPoolTaskExecutor();
//定义一个用于合流的结果集
List<CompletableFuture> results = new ArrayList<>();
//遍历拆分过的list
for (List<ConsumerCrmRecord> batchConsumers :partition){
//循环一个批次的list就用一个线程去执行
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return this.batchRetryList(batchConsumers);
}, threadPoolTaskExecutor);
//获取执行过后的成功个数
Integer batchSuccessCount = future.join();
successCount += batchSuccessCount;
results.add(future);
}
//把线程池的多线程合流,主线程会等这几个线程执行完成,如果不需要主线程等待可以不合流
CompletableFuture.allOf(results.toArray(results.toArray(new CompletableFuture[batchNum]))).join();
stopWatch.stop();
log.info("总用时:"+stopWatch.getTotalTimeMillis()+"毫秒");
count.put("success",successCount);
return Result.success(count);
}

分批执行的业务方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Integer batchRetryList(List<ConsumerCrmRecord> batchConsumers) {
log.info("正在处理重试列表batchRetryList:{},{}",Thread.currentThread().getName(),UUID.randomUUID().toString());
AtomicInteger successCount = new AtomicInteger();
batchConsumers.forEach(consumerCrmRecord -> {
JSONObject result = restTemplateUtil.post(url + consumerPath, consumerCrmRecord.getMessage().toJSONString(), new HashMap<>());
consumerCrmRecord.setResponse(result);
consumerCrmRecord.setUpdateTime(new Date());
if(result.getJSONObject("data") != null){
boolean error = result.getJSONObject("data").getBoolean("error");
if(!error){
consumerCrmRecord.setStatus(1);
successCount.getAndIncrement();
}
}
consumerCrmRecordRepository.save(consumerCrmRecord);
});
return successCount.get();
}

注意:

再使用多线程调用下游系统时,根据配置定义线程池的同时还要考虑到下游系统的吞吐量和可承载并发量。