mirror of
https://gitee.com/dromara/hutool.git
synced 2025-06-28 13:34:09 +08:00
可召回批处理线程池执行器,增加处理逻辑包装类,增加包装类处理方法
This commit is contained in:
parent
d1988d4db9
commit
7a2ef283ff
@ -5,6 +5,7 @@ import java.util.concurrent.*;
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ -14,12 +15,18 @@ import java.util.stream.Stream;
|
|||||||
* 1.数据分批并行处理
|
* 1.数据分批并行处理
|
||||||
* 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
|
* 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行
|
||||||
* 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
|
* 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可
|
||||||
* 4.适合批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积
|
* </pre>
|
||||||
|
*
|
||||||
|
* 适用场景:
|
||||||
|
* <pre>
|
||||||
|
* 1.批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积 {@link #process(List, int, Function)}
|
||||||
|
* 2.普通查询接口加速 {@link #processByWarp(Warp[])}
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author likuan
|
* @author likuan
|
||||||
*/
|
*/
|
||||||
public class RecyclableBatchThreadPoolExecutor {
|
public class RecyclableBatchThreadPoolExecutor {
|
||||||
|
|
||||||
private final ExecutorService executor;
|
private final ExecutorService executor;
|
||||||
|
|
||||||
public RecyclableBatchThreadPoolExecutor(int poolSize){
|
public RecyclableBatchThreadPoolExecutor(int poolSize){
|
||||||
@ -90,7 +97,6 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
* @param processor 单条数据处理函数
|
* @param processor 单条数据处理函数
|
||||||
* @return 处理结果集合
|
* @return 处理结果集合
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public <T,R> List<R> process(List<T> data, int batchSize, Function<T,R> processor) {
|
public <T,R> List<R> process(List<T> data, int batchSize, Function<T,R> processor) {
|
||||||
if (batchSize < 1) {
|
if (batchSize < 1) {
|
||||||
throw new IllegalArgumentException("batchSize >= 1");
|
throw new IllegalArgumentException("batchSize >= 1");
|
||||||
@ -99,7 +105,7 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
int batchCount = batches.size();
|
int batchCount = batches.size();
|
||||||
int minusOne = batchCount - 1;
|
int minusOne = batchCount - 1;
|
||||||
ArrayDeque<IdempotentTask<R>> taskQueue = new ArrayDeque<>(minusOne);
|
ArrayDeque<IdempotentTask<R>> taskQueue = new ArrayDeque<>(minusOne);
|
||||||
Map<Integer,Future<ResultWarp<R>>> futuresMap = new HashMap<>();
|
Map<Integer,Future<TaskResult<R>>> futuresMap = new HashMap<>();
|
||||||
// 提交前 batchCount-1 批任务
|
// 提交前 batchCount-1 批任务
|
||||||
for (int i = 0 ; i < minusOne ; i++) {
|
for (int i = 0 ; i < minusOne ; i++) {
|
||||||
final int index = i;
|
final int index = i;
|
||||||
@ -107,15 +113,15 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
taskQueue.add(task);
|
taskQueue.add(task);
|
||||||
futuresMap.put(i,executor.submit(task));
|
futuresMap.put(i,executor.submit(task));
|
||||||
}
|
}
|
||||||
Object[] arr = new Object[batchCount];
|
@SuppressWarnings("unchecked")
|
||||||
|
List<R>[] resultArr = new ArrayList[batchCount];
|
||||||
// 处理最后一批
|
// 处理最后一批
|
||||||
arr[minusOne] = processBatch(batches.get(minusOne), processor);
|
resultArr[minusOne] = processBatch(batches.get(minusOne), processor);
|
||||||
// 处理剩余任务
|
// 处理剩余任务
|
||||||
processRemainingTasks(taskQueue, futuresMap,arr);
|
processRemainingTasks(taskQueue, futuresMap,resultArr);
|
||||||
//排序、过滤null
|
//排序、过滤null
|
||||||
return Stream.of(arr)
|
return Stream.of(resultArr)
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.map(p -> (List<R>) p)
|
|
||||||
.flatMap(List::stream)
|
.flatMap(List::stream)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
@ -124,20 +130,20 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
* 处理剩余任务并收集结果
|
* 处理剩余任务并收集结果
|
||||||
* @param taskQueue 任务队列
|
* @param taskQueue 任务队列
|
||||||
* @param futuresMap 异步任务映射
|
* @param futuresMap 异步任务映射
|
||||||
* @param arr 结果存储数组
|
* @param resultArr 结果存储数组
|
||||||
*/
|
*/
|
||||||
private <R> void processRemainingTasks(Queue<IdempotentTask<R>> taskQueue, Map<Integer,Future<ResultWarp<R>>> futuresMap,Object[] arr) {
|
private <R> void processRemainingTasks(Queue<IdempotentTask<R>> taskQueue, Map<Integer,Future<TaskResult<R>>> futuresMap, List<R>[] resultArr) {
|
||||||
// 主消费未执行任务
|
// 主消费未执行任务
|
||||||
IdempotentTask<R> task;
|
IdempotentTask<R> task;
|
||||||
while ((task = taskQueue.poll()) != null) {
|
while ((task = taskQueue.poll()) != null) {
|
||||||
try {
|
try {
|
||||||
ResultWarp<R> call = task.call();
|
TaskResult<R> call = task.call();
|
||||||
if (call.effective) {
|
if (call.effective) {
|
||||||
// 取消被主线程执行任务
|
// 取消被主线程执行任务
|
||||||
Future<ResultWarp<R>> future = futuresMap.remove(task.index);
|
Future<TaskResult<R>> future = futuresMap.remove(task.index);
|
||||||
future.cancel(false);
|
future.cancel(false);
|
||||||
//加入结果集
|
//加入结果集
|
||||||
arr[task.index] = call.result;
|
resultArr[task.index] = call.result;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// 不处理异常
|
// 不处理异常
|
||||||
@ -146,9 +152,9 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
}
|
}
|
||||||
futuresMap.forEach((index,future)->{
|
futuresMap.forEach((index,future)->{
|
||||||
try {
|
try {
|
||||||
ResultWarp<R> resultWarp = future.get();
|
TaskResult<R> taskResult = future.get();
|
||||||
if(resultWarp.effective){
|
if(taskResult.effective){
|
||||||
arr[index] = resultWarp.result;
|
resultArr[index] = taskResult.result;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
@ -159,7 +165,7 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
/**
|
/**
|
||||||
* 幂等任务包装类,确保任务只执行一次
|
* 幂等任务包装类,确保任务只执行一次
|
||||||
*/
|
*/
|
||||||
private static class IdempotentTask<R> implements Callable<ResultWarp<R>> {
|
private static class IdempotentTask<R> implements Callable<TaskResult<R>> {
|
||||||
|
|
||||||
private final int index;
|
private final int index;
|
||||||
private final Callable<List<R>> delegate;
|
private final Callable<List<R>> delegate;
|
||||||
@ -171,21 +177,21 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultWarp<R> call() throws Exception {
|
public TaskResult<R> call() throws Exception {
|
||||||
if (executed.compareAndSet(false, true)) {
|
if (executed.compareAndSet(false, true)) {
|
||||||
return new ResultWarp<>(delegate.call(), true);
|
return new TaskResult<>(delegate.call(), true);
|
||||||
}
|
}
|
||||||
return new ResultWarp<>(null, false);
|
return new TaskResult<>(null, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 结果包装类,标记结果有效性
|
* 结果包装类,标记结果有效性
|
||||||
*/
|
*/
|
||||||
private static class ResultWarp<R>{
|
private static class TaskResult<R>{
|
||||||
private final List<R> result;
|
private final List<R> result;
|
||||||
private final boolean effective;
|
private final boolean effective;
|
||||||
ResultWarp(List<R> result, boolean effective){
|
TaskResult(List<R> result, boolean effective){
|
||||||
this.result = result;
|
this.result = result;
|
||||||
this.effective = effective;
|
this.effective = effective;
|
||||||
}
|
}
|
||||||
@ -197,7 +203,7 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
* @param batchSize 每批次数据量
|
* @param batchSize 每批次数据量
|
||||||
* @return 分片后的二维集合
|
* @return 分片后的二维集合
|
||||||
*/
|
*/
|
||||||
public static <T> List<List<T>> splitData(List<T> data, int batchSize) {
|
private static <T> List<List<T>> splitData(List<T> data, int batchSize) {
|
||||||
int batchCount = (data.size() + batchSize - 1) / batchSize;
|
int batchCount = (data.size() + batchSize - 1) / batchSize;
|
||||||
return new AbstractList<List<T>>() {
|
return new AbstractList<List<T>>() {
|
||||||
@Override
|
@Override
|
||||||
@ -220,8 +226,69 @@ public class RecyclableBatchThreadPoolExecutor {
|
|||||||
* @param processor 处理函数
|
* @param processor 处理函数
|
||||||
* @return 处理结果
|
* @return 处理结果
|
||||||
*/
|
*/
|
||||||
public static <T,R> List<R> processBatch(List<T> batch, Function<T,R> processor) {
|
private static <T,R> List<R> processBatch(List<T> batch, Function<T,R> processor) {
|
||||||
return batch.stream().map(processor).filter(Objects::nonNull).collect(Collectors.toList());
|
return batch.stream().map(processor).filter(Objects::nonNull).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理Warp集合
|
||||||
|
*
|
||||||
|
* <pre>{@code
|
||||||
|
* Warp<String> warp1 = Warp.of(this::select1);
|
||||||
|
* Warp<List<String>> warp2 = Warp.of(this::select2);
|
||||||
|
* executor.processByWarp(warp1, warp2);
|
||||||
|
* String r1 = warp1.get();
|
||||||
|
* List<String> r2 = warp2.get();
|
||||||
|
* }</pre>
|
||||||
|
*
|
||||||
|
* @param warps Warp集合
|
||||||
|
* @return Warp集合,此方法返回结果为空的不会被过滤
|
||||||
|
*/
|
||||||
|
public List<Warp<?>> processByWarp(Warp<?>... warps) {
|
||||||
|
return process(Arrays.asList(warps), 1, Warp::execute);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理逻辑包装类
|
||||||
|
* @param <R> 结果类型
|
||||||
|
*/
|
||||||
|
public static class Warp<R>{
|
||||||
|
|
||||||
|
private Warp(Supplier<R> supplier){
|
||||||
|
Objects.requireNonNull(supplier);
|
||||||
|
this.supplier = supplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建Warp
|
||||||
|
* @param supplier 执行逻辑
|
||||||
|
* @return Warp
|
||||||
|
* @param <R> 结果类型
|
||||||
|
*/
|
||||||
|
public static <R> Warp<R> of(Supplier<R> supplier){
|
||||||
|
return new Warp<>(supplier);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Supplier<R> supplier;
|
||||||
|
private R result;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取结果
|
||||||
|
* @return 结果
|
||||||
|
*/
|
||||||
|
public R get() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行
|
||||||
|
* @return this
|
||||||
|
*/
|
||||||
|
public Warp<R> execute() {
|
||||||
|
result = supplier.get();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,9 @@
|
|||||||
package cn.hutool.core.thread;
|
package cn.hutool.core.thread;
|
||||||
|
|
||||||
|
import cn.hutool.core.thread.RecyclableBatchThreadPoolExecutor.Warp;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
@ -17,6 +15,11 @@ import java.util.function.Function;
|
|||||||
*/
|
*/
|
||||||
public class RecyclableBatchThreadPoolExecutorTest {
|
public class RecyclableBatchThreadPoolExecutorTest {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 批量处理数据
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void test() throws InterruptedException {
|
public void test() throws InterruptedException {
|
||||||
int corePoolSize = 10;// 线程池大小
|
int corePoolSize = 10;// 线程池大小
|
||||||
@ -25,6 +28,24 @@ public class RecyclableBatchThreadPoolExecutorTest {
|
|||||||
test(corePoolSize,batchSize,clientCount);
|
test(corePoolSize,batchSize,clientCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 普通查询接口加速
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test2() {
|
||||||
|
RecyclableBatchThreadPoolExecutor executor = new RecyclableBatchThreadPoolExecutor(10);
|
||||||
|
long s = System.nanoTime();
|
||||||
|
Warp<String> warp1 = Warp.of(this::select1);
|
||||||
|
Warp<List<String>> warp2 = Warp.of(this::select2);
|
||||||
|
executor.processByWarp(warp1, warp2);
|
||||||
|
Map<String, Object> map = new HashMap<>();
|
||||||
|
map.put("key1",warp1.get());
|
||||||
|
map.put("key2",warp2.get());
|
||||||
|
long d = System.nanoTime() - s;
|
||||||
|
System.out.printf("总耗时:%.2f秒%n",d/1e9);
|
||||||
|
System.out.println(map);
|
||||||
|
}
|
||||||
|
|
||||||
public void test(int corePoolSize,int batchSize,int clientCount ) throws InterruptedException{
|
public void test(int corePoolSize,int batchSize,int clientCount ) throws InterruptedException{
|
||||||
RecyclableBatchThreadPoolExecutor processor = new RecyclableBatchThreadPoolExecutor(corePoolSize);
|
RecyclableBatchThreadPoolExecutor processor = new RecyclableBatchThreadPoolExecutor(corePoolSize);
|
||||||
// 模拟多个调用者线程提交任务
|
// 模拟多个调用者线程提交任务
|
||||||
@ -73,4 +94,22 @@ public class RecyclableBatchThreadPoolExecutorTest {
|
|||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String select1() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(3000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return "1";
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> select2() {
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return Arrays.asList("1","2","3");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user