List拆分以后使用线程池并发处理其中数据
业务中需要将mq已经消费成功的数据但是调用的下游系统错误的记录重新请求一次并更新到记录中,提供了一个通过开始和结束时间处理之前失败记录的接口,但是单线程时数据量大处理的很慢,所以采用线程池启动多线程进行重新调用并更新业务结果。
定义线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package com.rocketmq.consumer.config;
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration @Slf4j public class ThreadPoolConfig {
private static final int threadNum = Runtime.getRuntime().availableProcessors() + 1;
public static int corePoolSize = threadNum;
private int maxPoolSize = 2 * threadNum;
private int keepAliveSeconds = 60;
private int queueCapacity = 1024;
private static final String threadNamePrefixName = "Async-Retry-Service-";
@Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor createThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setMaxPoolSize(maxPoolSize); executor.setCorePoolSize(threadNum); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setThreadNamePrefix(threadNamePrefixName); executor.setQueueCapacity(queueCapacity); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); log.info("初始化线程池(threadPoolTaskExecutor):核心线程池大小:" + threadNum); return executor; }
}
|
service中调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public Result retryRecord(Date startDate,Date endDate,String apiName) { JSONObject count = new JSONObject(); Date startDate = ConcurrentDateUtil.parse(startDateStr); Date endDate = ConcurrentDateUtil.parse(endDateStr); List<ConsumerCrmRecord> crmRecords = consumerCrmRecordRepository.findByCreateTimeBetweenAndApiNameAndStatus(startDate, endDate, apiName,0); count.put("total",crmRecords.size()); int successCount = 0; List<List<ConsumerCrmRecord>> partition = Lists.partition(crmRecords, 500); int batchNum = partition.size(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolConfig.createThreadPoolTaskExecutor(); List<CompletableFuture> results = new ArrayList<>(); for (List<ConsumerCrmRecord> batchConsumers :partition){ CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return this.batchRetryList(batchConsumers); }, threadPoolTaskExecutor); Integer batchSuccessCount = future.join(); successCount += batchSuccessCount; results.add(future); } CompletableFuture.allOf(results.toArray(results.toArray(new CompletableFuture[batchNum]))).join(); stopWatch.stop(); log.info("总用时:"+stopWatch.getTotalTimeMillis()+"毫秒"); count.put("success",successCount); return Result.success(count); }
|
分批执行的业务方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public Integer batchRetryList(List<ConsumerCrmRecord> batchConsumers) { log.info("正在处理重试列表batchRetryList:{},{}",Thread.currentThread().getName(),UUID.randomUUID().toString()); AtomicInteger successCount = new AtomicInteger(); batchConsumers.forEach(consumerCrmRecord -> { JSONObject result = restTemplateUtil.post(url + consumerPath, consumerCrmRecord.getMessage().toJSONString(), new HashMap<>()); consumerCrmRecord.setResponse(result); consumerCrmRecord.setUpdateTime(new Date()); if(result.getJSONObject("data") != null){ boolean error = result.getJSONObject("data").getBoolean("error"); if(!error){ consumerCrmRecord.setStatus(1); successCount.getAndIncrement(); } } consumerCrmRecordRepository.save(consumerCrmRecord); }); return successCount.get(); }
|
注意:
再使用多线程调用下游系统时,根据配置定义线程池的同时还要考虑到下游系统的吞吐量和可承载并发量。