diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java index 3418bd7cb..28c526135 100644 --- a/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java +++ b/hutool-core/src/main/java/cn/hutool/core/thread/ThreadUtil.java @@ -1,5 +1,7 @@ package cn.hutool.core.thread; +import cn.hutool.core.thread.rejected.RejectedExecutionHandlerUtility; + import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -8,6 +10,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -40,6 +43,114 @@ public class ThreadUtil { return builder.build(); } + /** + * 获取一个新的线程池,默认的策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue,默认队列大小为1024
+	 * 
+ * + * @param nThreads 线程池大小 + * @param threadNamePrefix 线程名称前缀 + * @return ExecutorService + */ + public static ExecutorService newFixedExecutor(int nThreads, String threadNamePrefix) { + ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); + ExecutorBuilder builder = ExecutorBuilder.create().setThreadFactory(threadFactory); + if (nThreads > 0) { + builder.setCorePoolSize(nThreads) + .setMaxPoolSize(nThreads); + } + return builder.build(); + } + + /** + * 获取一个新的线程池,默认的策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue,默认队列大小为1024
+	 *     3. 当执行拒绝策略的时候会处于阻塞状态,直到能添加到队列中或者被{@link Thread#interrupt()}中断
+	 * 
+ * + * @param nThreads 线程池大小 + * @param threadNamePrefix 线程名称前缀 + * @return ExecutorService + */ + public static ExecutorService newFixedBlockedExecutor(int nThreads, String threadNamePrefix) { + ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); + ExecutorBuilder builder = ExecutorBuilder.create() + .setCorePoolSize(nThreads) + .setMaxPoolSize(nThreads) + .setThreadFactory(threadFactory) + .setHandler(new RejectedExecutionHandlerUtility.BlockPolicy()); + return builder.build(); + } + + /** + * 获取一个新的线程池,默认的策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue
+	 * 
+ * + * @param nThreads 线程池大小 + * @param maximumQueueSize 队列大小 + * @param threadNamePrefix 线程名称前缀 + * @return ExecutorService + */ + public static ExecutorService newFixedExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix) { + return newFixedExecutor(nThreads, maximumQueueSize, threadNamePrefix, null); + } + + /** + * 获取一个新的线程池,默认的策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue
+	 *     3. 当执行拒绝策略的时候会处于阻塞状态,直到能添加到队列中或者被{@link Thread#interrupt()}中断
+	 * 
+ * + * @param nThreads 线程池大小 + * @param threadNamePrefix 线程名称前缀 + * @return ExecutorService + */ + public static ExecutorService newFixedBlockingExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix) { + return newFixedExecutor(nThreads, + maximumQueueSize, + threadNamePrefix, + new RejectedExecutionHandlerUtility.BlockPolicy()); + } + + /** + * 获得一个新的线程池,默认策略如下
+ *
+	 *     1. 核心线程数与最大线程数为nThreads指定的大小
+	 *     2. 默认使用LinkedBlockingQueue
+	 * 
+ * + * @param nThreads 线程池大小 + * @param maximumQueueSize 队列大小 + * @param threadNamePrefix 线程名称前缀 + * @param handler 拒绝策略 + * @return ExecutorService + */ + public static ExecutorService newFixedExecutor(int nThreads, + int maximumQueueSize, + String threadNamePrefix, + RejectedExecutionHandler handler) { + + ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build(); + + ExecutorBuilder builder = ExecutorBuilder.create() + .setCorePoolSize(nThreads) + .setMaxPoolSize(nThreads) + .setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize)) + .setThreadFactory(threadFactory) + .setHandler(handler); + + return builder.build(); + } + /** * 获得一个新的线程池,默认的策略如下: *
diff --git a/hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java b/hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java
new file mode 100644
index 000000000..d85a6f907
--- /dev/null
+++ b/hutool-core/src/main/java/cn/hutool/core/thread/rejected/RejectedExecutionHandlerUtility.java
@@ -0,0 +1,32 @@
+package cn.hutool.core.thread.rejected;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 线程池拒绝策略工具类
+ *
+ * @author luozongle
+ */
+public class RejectedExecutionHandlerUtility {
+
+	/**
+	 * 当任务队列过长时处于阻塞状态,直到添加到队列中
+	 * 如果阻塞过程中被中断,就会抛出{@link InterruptedException}异常
+	 */
+    public static class BlockPolicy implements RejectedExecutionHandler {
+
+        public BlockPolicy() {
+        }
+
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+            try {
+                e.getQueue().put(r);
+            } catch (InterruptedException ex) {
+                throw new RejectedExecutionException("Task " + r + " rejected from " + e);
+            }
+        }
+    }
+}