From d1988d4db9568eab3394785e2f3e380d941de3cc Mon Sep 17 00:00:00 2001 From: likuan Date: Wed, 21 May 2025 13:46:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=AF=E5=8F=AC=E5=9B=9E=E6=89=B9=E5=A4=84?= =?UTF-8?q?=E7=90=86=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=89=A7=E8=A1=8C=E5=99=A8?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=8D=A2=E4=BB=BB=E5=8A=A1=E9=98=9F=E5=88=97?= =?UTF-8?q?=EF=BC=8C=E5=AE=8C=E5=96=84=E6=8E=A5=E5=8F=A3=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RecyclableBatchThreadPoolExecutor.java | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java index f6a3dc20f..a2aba2eb8 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/RecyclableBatchThreadPoolExecutor.java @@ -10,9 +10,12 @@ import java.util.stream.Stream; /** * 可召回批处理线程池执行器 + *
  * 1.数据分批并行处理
- * 2.线程安全,可用同时执行多个任务
- * 3.主线程、线程池混合执行,主线程空闲时会尝试召回线程池队列中的任务执行,无需担心任务阻塞
+ * 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
+ * 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
+ * 4.适合批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积
+ * 
* * @author likuan */ @@ -25,8 +28,10 @@ public class RecyclableBatchThreadPoolExecutor { /** * 建议的构造方法 - * 使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略 - * 假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化 + *
+	 * 1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略
+	 * 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
+	 * 
* * @param poolSize 线程池大小 * @param threadPoolPrefix 线程名前缀 @@ -61,11 +66,22 @@ public class RecyclableBatchThreadPoolExecutor { executor.shutdown(); } + /** + * 获取线程池 + * @return ExecutorService + */ + public ExecutorService getExecutor(){ + return executor; + } + /** * 分批次处理数据 + *
 	 * 1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果{@link Function}返回null即可
-	 * 2.异常在{@link Function}中自行处理
-	 * 3.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+	 * 2.{@link Function}需自行处理异常、保证线程安全
+	 * 3.原始数据在分片后可能被外部修改,导致批次数据不一致,如有必要,传参之前进行数据拷贝
+	 * 4.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
+	 * 
* * @param 输入数据类型 * @param 输出数据类型 @@ -81,10 +97,11 @@ public class RecyclableBatchThreadPoolExecutor { } List> batches = splitData(data, batchSize); int batchCount = batches.size(); - ConcurrentLinkedQueue> taskQueue = new ConcurrentLinkedQueue<>(); + int minusOne = batchCount - 1; + ArrayDeque> taskQueue = new ArrayDeque<>(minusOne); Map>> futuresMap = new HashMap<>(); // 提交前 batchCount-1 批任务 - for (int i = 0 ; i < batchCount-1 ; i++) { + for (int i = 0 ; i < minusOne ; i++) { final int index = i; IdempotentTask task = new IdempotentTask<>(i,() -> processBatch(batches.get(index), processor)); taskQueue.add(task); @@ -92,7 +109,7 @@ public class RecyclableBatchThreadPoolExecutor { } Object[] arr = new Object[batchCount]; // 处理最后一批 - arr[batchCount-1] = processBatch(batches.get(batchCount-1), processor); + arr[minusOne] = processBatch(batches.get(minusOne), processor); // 处理剩余任务 processRemainingTasks(taskQueue, futuresMap,arr); //排序、过滤null