可召回批处理线程池执行器,更换任务队列,完善接口文档

This commit is contained in:
likuan 2025-05-21 13:46:51 +08:00
parent 2d7a64f660
commit d1988d4db9

View File

@ -10,9 +10,12 @@ import java.util.stream.Stream;
/**
* 可召回批处理线程池执行器
* <pre>
* 1.数据分批并行处理
* 2.线程安全可用同时执行多个任务
* 3.主线程线程池混合执行主线程空闲时会尝试召回线程池队列中的任务执行无需担心任务阻塞
* 2.主线程线程池混合执行批处理任务主线程空闲时会尝试召回线程池队列中的任务执行
* 3.线程安全可用同时执行多个任务线程池满载时效率与单线程模式相当无阻塞风险无脑提交任务即可
* 4.适合批量处理数据且需要同步结束的场景能一定程度上提高吞吐量防止任务堆积
* </pre>
*
* @author likuan
*/
@ -25,8 +28,10 @@ public class RecyclableBatchThreadPoolExecutor {
/**
* 建议的构造方法
* 使用无界队列主线程会召回队列中的任务执行不会有任务堆积无需考虑拒绝策略
* 假如在web场景中请求量过大导致oom不使用此工具也会有同样的结果甚至更严重应该对请求做限制或做其他优化
* <pre>
* 1.使用无界队列主线程会召回队列中的任务执行不会有任务堆积无需考虑拒绝策略
* 2.假如在web场景中请求量过大导致oom不使用此工具也会有同样的结果甚至更严重应该对请求做限制或做其他优化
* </pre>
*
* @param poolSize 线程池大小
* @param threadPoolPrefix 线程名前缀
@ -61,11 +66,22 @@ public class RecyclableBatchThreadPoolExecutor {
executor.shutdown();
}
/**
* 获取线程池
* @return ExecutorService
*/
public ExecutorService getExecutor(){
return executor;
}
/**
* 分批次处理数据
* <pre>
* 1.所有批次执行完成后会过滤null并返回合并结果保持输入数据顺序不需要结果{@link Function}返回null即可
* 2.异常在{@link Function}中自行处理
* 3.主线程会参与处理批次数据如果要异步执行任务请使用普通线程池
* 2.{@link Function}需自行处理异常保证线程安全
* 3.原始数据在分片后可能被外部修改导致批次数据不一致如有必要传参之前进行数据拷贝
* 4.主线程会参与处理批次数据如果要异步执行任务请使用普通线程池
* </pre>
*
* @param <T> 输入数据类型
* @param <R> 输出数据类型
@ -81,10 +97,11 @@ public class RecyclableBatchThreadPoolExecutor {
}
List<List<T>> batches = splitData(data, batchSize);
int batchCount = batches.size();
ConcurrentLinkedQueue<IdempotentTask<R>> taskQueue = new ConcurrentLinkedQueue<>();
int minusOne = batchCount - 1;
ArrayDeque<IdempotentTask<R>> taskQueue = new ArrayDeque<>(minusOne);
Map<Integer,Future<ResultWarp<R>>> futuresMap = new HashMap<>();
// 提交前 batchCount-1 批任务
for (int i = 0 ; i < batchCount-1 ; i++) {
for (int i = 0 ; i < minusOne ; i++) {
final int index = i;
IdempotentTask<R> task = new IdempotentTask<>(i,() -> processBatch(batches.get(index), processor));
taskQueue.add(task);
@ -92,7 +109,7 @@ public class RecyclableBatchThreadPoolExecutor {
}
Object[] arr = new Object[batchCount];
// 处理最后一批
arr[batchCount-1] = processBatch(batches.get(batchCount-1), processor);
arr[minusOne] = processBatch(batches.get(minusOne), processor);
// 处理剩余任务
processRemainingTasks(taskQueue, futuresMap,arr);
//排序过滤null