add TriggerQueueTaskTable

This commit is contained in:
Looly
2025-10-20 00:02:40 +08:00
parent 2672932f3b
commit 94cd4098d2
11 changed files with 503 additions and 197 deletions

View File

@@ -23,6 +23,7 @@ import cn.hutool.v7.core.lang.tuple.Triple;
import java.io.Serial;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
@@ -36,7 +37,7 @@ import java.util.List;
* @author Looly
* @since 6.0.0
*/
public class TripleTable<L, M, R> implements Serializable {
public class TripleTable<L, M, R> implements Iterable<Triple<L, M, R>>, Serializable {
@Serial
private static final long serialVersionUID = 1L;
@@ -53,6 +54,8 @@ public class TripleTable<L, M, R> implements Serializable {
*/
private final List<R> rights;
// region ----- 构造
/**
* 构造
*
@@ -92,6 +95,7 @@ public class TripleTable<L, M, R> implements Serializable {
this.middles = middles;
this.rights = rights;
}
// endregion
// region ----- getLeft
@@ -167,7 +171,7 @@ public class TripleTable<L, M, R> implements Serializable {
* @param index 索引
* @return 左值
*/
public L getLeft(final int index){
public L getLeft(final int index) {
return this.lefts.get(index);
}
@@ -177,7 +181,7 @@ public class TripleTable<L, M, R> implements Serializable {
* @param index 索引
* @return 中值
*/
public M getMiddle(final int index){
public M getMiddle(final int index) {
return this.middles.get(index);
}
@@ -187,7 +191,7 @@ public class TripleTable<L, M, R> implements Serializable {
* @param index 索引
* @return 右值
*/
public R getRight(final int index){
public R getRight(final int index) {
return this.rights.get(index);
}
@@ -467,4 +471,23 @@ public class TripleTable<L, M, R> implements Serializable {
this.rights.remove(index);
return this;
}
@Override
public Iterator<Triple<L, M, R>> iterator() {
final int size = this.size();
return new Iterator<>() {
private int index = -1;
@Override
public boolean hasNext() {
return index + 1 < size;
}
@Override
public Triple<L, M, R> next() {
index++;
return new Triple<>(getLeft(index), getMiddle(index), getRight(index));
}
};
}
}

View File

@@ -26,6 +26,15 @@ import java.util.TimeZone;
*/
public class CronConfig {
/**
* 创建Cron配置
*
* @return Cron配置
*/
public static CronConfig of(){
return new CronConfig();
}
/**
* 时区
*/
@@ -38,6 +47,10 @@ public class CronConfig {
* 是否为守护线程
*/
private boolean daemon;
/**
* 是否使用触发队列
*/
private boolean useTriggerQueue;
/**
* 构造
@@ -45,6 +58,15 @@ public class CronConfig {
public CronConfig() {
}
/**
* 获得时区,默认为 {@link TimeZone#getDefault()}
*
* @return 时区
*/
public TimeZone getTimeZone() {
return this.timezone;
}
/**
* 设置时区
*
@@ -56,15 +78,6 @@ public class CronConfig {
return this;
}
/**
* 获得时区,默认为 {@link TimeZone#getDefault()}
*
* @return 时区
*/
public TimeZone getTimeZone() {
return this.timezone;
}
/**
* 是否支持秒匹配
*
@@ -104,4 +117,26 @@ public class CronConfig {
this.daemon = daemon;
return this;
}
/**
* 是否使用触发队列
*
* @return {@code true}使用,{@code false}不使用
*/
public boolean isUseTriggerQueue() {
return this.useTriggerQueue;
}
/**
* 设置是否使用触发队列<br>
* {@code true}则使用对接方式触发,此时会预先将任务的下一次触发时间加入队列,队列中任务的触发时间小于当前时间时,则从队列中取出并执行。<br>
* {@code false}则使用普通方式触发此时会检查任务表当任务表中的表达式匹配指定时间时则执行相应的Task。
*
* @param useTriggerQueue {@code true}使用,{@code false}不使用
* @return this
*/
public CronConfig setUseTriggerQueue(final boolean useTriggerQueue) {
this.useTriggerQueue = useTriggerQueue;
return this;
}
}

View File

@@ -0,0 +1,58 @@
package cn.hutool.v7.cron;
import cn.hutool.v7.cron.task.CronTask;
import java.util.concurrent.locks.Lock;
/**
* 基于匹配的任务表<br>
* 每次检查任务表中的表达式是否匹配指定时间如果匹配则执行相应的Task
*
* @author Looly
*/
public class MatchTaskTable extends TaskTable {
/**
* 构造, 默认容量为{@link TaskTable#DEFAULT_CAPACITY}
*/
public MatchTaskTable() {
this(DEFAULT_CAPACITY);
}
/**
* 构造
*
* @param initialCapacity 初始容量
*/
public MatchTaskTable(final int initialCapacity) {
super(initialCapacity);
}
@Override
public void execute(final Scheduler scheduler, final long millis) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
executeTaskIfMatchInternal(scheduler, millis);
} finally {
readLock.unlock();
}
}
/**
* 如果时间匹配则执行相应的Task无锁
*
* @param scheduler {@link Scheduler}
* @param millis 时间毫秒
* @since 3.1.1
*/
private void executeTaskIfMatchInternal(final Scheduler scheduler, final long millis) {
final int size = size();
for (int i = 0; i < size; i++) {
if (this.table.getMiddle(i).match(scheduler.config.getTimeZone(), millis, scheduler.config.isMatchSecond())) {
scheduler.taskManager.spawnExecutor(
new CronTask(this.table.getLeft(i), this.table.getMiddle(i), this.table.getRight(i)));
}
}
}
}

View File

@@ -92,7 +92,7 @@ public class Scheduler implements Serializable {
/**
* 定时任务表
*/
protected TaskTable taskTable;
private TaskTable taskTable;
/**
* 线程池用于执行TaskLauncher和TaskExecutor
*/
@@ -111,7 +111,7 @@ public class Scheduler implements Serializable {
* 使用默认配置
*/
public Scheduler() {
this(new CronConfig());
this(CronConfig.of());
}
/**
@@ -122,8 +122,8 @@ public class Scheduler implements Serializable {
public Scheduler(final CronConfig config) {
this.config = config;
lock = new ReentrantLock();
this.taskTable = new TaskTable();
this.listenerManager = new TaskListenerManager();
clear();
}
/**
@@ -373,7 +373,7 @@ public class Scheduler implements Serializable {
* @since 4.1.17
*/
public Scheduler clear() {
this.taskTable = new TaskTable();
this.taskTable = TaskTableFactory.create(this.config);
return this;
}
@@ -473,6 +473,15 @@ public class Scheduler implements Serializable {
return this;
}
/**
* 执行任务表中匹配时间戳的定时任务
*
* @param millis 毫秒时间戳
*/
public void execute(final long millis){
this.taskTable.execute(this, millis);
}
/**
* 检查定时任务是否已经启动
*

View File

@@ -31,10 +31,10 @@ public record TaskLauncher(Scheduler scheduler, long millis) implements Runnable
public void run() {
try{
//匹配秒部分由用户定义决定,始终不匹配年
scheduler.taskTable.executeTaskIfMatch(this.scheduler, this.millis);
this.scheduler.execute(this.millis);
} finally {
//结束通知
scheduler.taskManager.notifyLauncherCompleted(this);
this.scheduler.taskManager.notifyLauncherCompleted(this);
}
}
}

View File

@@ -19,7 +19,6 @@ package cn.hutool.v7.cron;
import cn.hutool.v7.core.map.TripleTable;
import cn.hutool.v7.core.text.StrUtil;
import cn.hutool.v7.cron.pattern.CronPattern;
import cn.hutool.v7.cron.task.CronTask;
import cn.hutool.v7.cron.task.Task;
import java.io.Serial;
@@ -31,29 +30,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 定时任务表<br>
* 任务表将ID、表达式、任务一一对应,定时任务执行过程中,会周期性检查定时任务表中的所有任务表达式匹配情况,从而执行其对应的任务<br>
* 任务表将ID、表达式、任务一一对应<br>
* 任务的添加、移除使用读写锁保证线程安全性
*
* @author Looly
*/
public class TaskTable implements Serializable {
public abstract class TaskTable implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 默认任务表大小:10
* 默认任务表大小:256
*/
public static final int DEFAULT_CAPACITY = 10;
public static final int DEFAULT_CAPACITY = 256;
/**
* 读写锁,保证线程安全
*/
private final ReadWriteLock lock;
final ReadWriteLock lock;
/**
* 任务表ID、表达式、任务一一对应。使用TripleTable存储便于快速查找和更新<br>
*/
private final TripleTable<String, CronPattern, Task> table;
final TripleTable<String, CronPattern, Task> table;
/**
* 构造
@@ -69,10 +68,151 @@ public class TaskTable implements Serializable {
*/
public TaskTable(final int initialCapacity) {
lock = new ReentrantReadWriteLock();
this.table = new TripleTable<>(initialCapacity);
}
/**
* 任务表大小,加入的任务数
*
* @return 任务表大小,加入的任务数
* @since 4.0.2
*/
public int size() {
return this.table.size();
}
/**
* 任务表是否为空
*
* @return true为空
* @since 4.0.2
*/
public boolean isEmpty() {
return size() < 1;
}
/**
* 获取所有ID返回不可变列表即列表不可修改
*
* @return ID列表
* @since 4.6.7
*/
public List<String> getIds() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getLefts();
} finally {
readLock.unlock();
}
}
// region ----- getTask
/**
* 获取所有定时任务,返回不可变列表,即列表不可修改
*
* @return 定时任务列表
* @since 4.6.7
*/
public List<Task> getTasks() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getRights();
} finally {
readLock.unlock();
}
}
/**
* 获得指定位置的{@link Task}
*
* @param index 位置
* @return {@link Task}
* @since 3.1.1
*/
public Task getTask(final int index) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getRight(index);
} finally {
readLock.unlock();
}
}
/**
* 获得指定id的{@link Task}
*
* @param id ID
* @return {@link Task}
* @since 3.1.1
*/
public Task getTask(final String id) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return table.getRightByLeft(id);
} finally {
readLock.unlock();
}
}
// endregion
// region ----- getPattern
/**
* 获取所有定时任务表达式,返回不可变列表,即列表不可修改
*
* @return 定时任务表达式列表
* @since 4.6.7
*/
public List<CronPattern> getPatterns() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getMiddles();
} finally {
readLock.unlock();
}
}
/**
* 获得指定id的{@link CronPattern}
*
* @param id ID
* @return {@link CronPattern}
* @since 3.1.1
*/
public CronPattern getPattern(final String id) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return table.getMiddleByLeft(id);
} finally {
readLock.unlock();
}
}
/**
* 获得指定位置的{@link CronPattern}
*
* @param index 位置
* @return {@link CronPattern}
* @since 3.1.1
*/
public CronPattern getPattern(final int index) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return table.getMiddle(index);
} finally {
readLock.unlock();
}
}
// endregion
/**
* 新增Task
*
@@ -95,54 +235,6 @@ public class TaskTable implements Serializable {
return this;
}
/**
* 获取所有ID返回不可变列表即列表不可修改
*
* @return ID列表
* @since 4.6.7
*/
public List<String> getIds() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getLefts();
} finally {
readLock.unlock();
}
}
/**
* 获取所有定时任务表达式,返回不可变列表,即列表不可修改
*
* @return 定时任务表达式列表
* @since 4.6.7
*/
public List<CronPattern> getPatterns() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getMiddles();
} finally {
readLock.unlock();
}
}
/**
* 获取所有定时任务,返回不可变列表,即列表不可修改
*
* @return 定时任务列表
* @since 4.6.7
*/
public List<Task> getTasks() {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getRights();
} finally {
readLock.unlock();
}
}
/**
* 移除Task
*
@@ -187,110 +279,6 @@ public class TaskTable implements Serializable {
return false;
}
/**
* 获得指定位置的{@link Task}
*
* @param index 位置
* @return {@link Task}
* @since 3.1.1
*/
public Task getTask(final int index) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return this.table.getRight(index);
} finally {
readLock.unlock();
}
}
/**
* 获得指定id的{@link Task}
*
* @param id ID
* @return {@link Task}
* @since 3.1.1
*/
public Task getTask(final String id) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return table.getRightByLeft(id);
} finally {
readLock.unlock();
}
}
/**
* 获得指定id的{@link CronPattern}
*
* @param id ID
* @return {@link CronPattern}
* @since 3.1.1
*/
public CronPattern getPattern(final String id) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return table.getMiddleByLeft(id);
} finally {
readLock.unlock();
}
}
/**
* 获得指定位置的{@link CronPattern}
*
* @param index 位置
* @return {@link CronPattern}
* @since 3.1.1
*/
public CronPattern getPattern(final int index) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return table.getMiddle(index);
} finally {
readLock.unlock();
}
}
/**
* 任务表大小,加入的任务数
*
* @return 任务表大小,加入的任务数
* @since 4.0.2
*/
public int size() {
return this.table.size();
}
/**
* 任务表是否为空
*
* @return true为空
* @since 4.0.2
*/
public boolean isEmpty() {
return size() < 1;
}
/**
* 如果时间匹配则执行相应的Task带读锁
*
* @param scheduler {@link Scheduler}
* @param millis 时间毫秒
*/
public void executeTaskIfMatch(final Scheduler scheduler, final long millis) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
executeTaskIfMatchInternal(scheduler, millis);
} finally {
readLock.unlock();
}
}
@Override
public String toString() {
final int size = this.size();
@@ -303,19 +291,10 @@ public class TaskTable implements Serializable {
}
/**
* 如果时间匹配则执行相应的Task,无锁
* 根据给定的时间戳匹配任务,如果匹配成功则使用调度器执行相应的Task
*
* @param scheduler {@link Scheduler}
* @param millis 时间毫秒
* @since 3.1.1
* @param scheduler 调度器
* @param millis 时间
*/
private void executeTaskIfMatchInternal(final Scheduler scheduler, final long millis) {
final int size = size();
for (int i = 0; i < size; i++) {
if (this.table.getMiddle(i).match(scheduler.config.getTimeZone(), millis, scheduler.config.isMatchSecond())) {
scheduler.taskManager.spawnExecutor(
new CronTask(this.table.getLeft(i), this.table.getMiddle(i), this.table.getRight(i)));
}
}
}
public abstract void execute(final Scheduler scheduler, final long millis);
}

View File

@@ -0,0 +1,19 @@
package cn.hutool.v7.cron;
/**
* 任务表工厂类
*
* @author Looly
*/
public class TaskTableFactory {
/**
* 创建任务表
*
* @param config 定时任务配置
* @return 任务表
*/
public static TaskTable create(CronConfig config) {
return config.isUseTriggerQueue() ? new TriggerQueueTaskTable() : new MatchTaskTable();
}
}

View File

@@ -0,0 +1,138 @@
package cn.hutool.v7.cron;
import cn.hutool.v7.core.text.StrUtil;
import cn.hutool.v7.cron.pattern.CronPattern;
import cn.hutool.v7.cron.task.CronTask;
import cn.hutool.v7.cron.task.Task;
import cn.hutool.v7.log.Log;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Lock;
/**
* 带有触发队列的定时任务表<br>
* 在用户添加任务时,会把下一次触发时间加入到队列中,会从队列中取出任务并执行。<br>
* 执行任务时,会检查队列,当队列中任务的触发时间小于当前时间时,则从队列中取出并执行。<br>
* 执行后,将下一次触发时间加入到队列中。
*
* @author Looly
*/
public class TriggerQueueTaskTable extends TaskTable {
private static final Log log = Log.get();
private final PriorityBlockingQueue<TriggerTime> triggerQueue;
/**
* 构造, 默认容量为{@link TaskTable#DEFAULT_CAPACITY}
*/
public TriggerQueueTaskTable() {
this(DEFAULT_CAPACITY);
}
/**
* 构造
*
* @param initialCapacity 初始容量
*/
public TriggerQueueTaskTable(final int initialCapacity) {
super(initialCapacity);
this.triggerQueue = new PriorityBlockingQueue<>(initialCapacity);
}
@Override
public TriggerQueueTaskTable add(String id, CronPattern pattern, Task task) {
super.add(id, pattern, task);
// 将下一个触发时间及任务添加到队列中
this.triggerQueue.offer(new TriggerTime(id, pattern.nextMatchFromNow()));
return this;
}
@Override
public boolean remove(String id) {
// 移除队列中的任务
this.triggerQueue.removeIf(task -> StrUtil.equals(task.id(), id));
return super.remove(id);
}
@Override
public boolean updatePattern(String id, CronPattern pattern) {
// 移除队列中的任务
this.triggerQueue.removeIf(task -> StrUtil.equals(task.id(), id));
// 将下一个触发时间及任务添加到队列中
this.triggerQueue.offer(new TriggerTime(id, pattern.nextMatchFromNow()));
return super.updatePattern(id, pattern);
}
@Override
public void execute(final Scheduler scheduler, final long millis) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
executeTaskBeforeInternal(scheduler, millis);
} finally {
readLock.unlock();
}
}
/**
* 执行给定时间戳以及之前所有的任务<br>
* 此方法依赖于触发队列
*
* @param scheduler {@link Scheduler}
* @param millis 时间毫秒
*/
private void executeTaskBeforeInternal(final Scheduler scheduler, final long millis) {
while (true) {
final TriggerTime triggerTime = this.triggerQueue.poll();
if (null == triggerTime) {
// 队列空
break;
}
final long triggerTimestamp = triggerTime.timestamp();
if (triggerTimestamp > millis) {
// 任务时间未到
this.triggerQueue.offer(triggerTime);
break;
}
// 执行任务
final String id = triggerTime.id();
scheduler.taskManager.spawnExecutor(getCronTask(id));
// 将下一个触发时间及任务添加到队列中
long nextMillis = millis;
if((millis - triggerTimestamp) / 1000 == 0){
// 秒级别相同,说明这秒已经执行,从下一秒开始
nextMillis += 1000;
}
this.triggerQueue.offer(new TriggerTime(id, getPattern(id).nextMatch(nextMillis)));
}
}
/**
* 获取一个{@link CronTask},无锁
*
* @param id ID
* @return {@link CronTask}
* @since 7.0.0
*/
private CronTask getCronTask(final String id) {
final int index = this.table.indexOfLeft(id);
return index > -1 ? new CronTask(id, this.table.getMiddle(index), this.table.getRight(index)) : null;
}
/**
* 触发时间
*
* @param id ID
* @param timestamp 触发时间
*/
private record TriggerTime(String id, long timestamp) implements Comparable<TriggerTime> {
@Override
public int compareTo(TriggerTime other) {
return Long.compare(this.timestamp, other.timestamp);
}
}
}

View File

@@ -106,6 +106,8 @@ public class CronPattern {
this.matchers = PatternParser.parse(pattern);
}
// region ----- match
/**
* 给定时间是否匹配定时任务表达式
*
@@ -138,7 +140,7 @@ public class CronPattern {
* @param isMatchSecond 是否匹配秒
* @return 如果匹配返回 {@code true}, 否则返回 {@code false}
*/
public boolean match(final Calendar calendar, final boolean isMatchSecond) {
public boolean match(final Calendar calendar, final boolean isMatchSecond) {
return match(PatternUtil.getFields(calendar, isMatchSecond));
}
@@ -153,6 +155,19 @@ public class CronPattern {
public boolean match(final LocalDateTime dateTime, final boolean isMatchSecond) {
return match(PatternUtil.getFields(dateTime, isMatchSecond));
}
// endregion
// region ----- nextMatch
/**
* 从当前时间开始,返回匹配到的下一个时间,如果当前时间匹配,直接返回
*
* @return 匹配到的下一个时间时间戳
* @since 7.0.0
*/
public long nextMatchFromNow() {
return nextMatch(Calendar.getInstance()).getTimeInMillis();
}
/**
* 返回匹配到的下一个时间
@@ -162,7 +177,7 @@ public class CronPattern {
*/
public Calendar nextMatchAfter(Calendar calendar) {
// issue#I9FQUA当提供的时间已经匹配表达式时增加1秒以匹配下一个时间
if(match(calendar, true)){
if (match(calendar, true)) {
final Calendar newCalendar = Calendar.getInstance(calendar.getTimeZone());
newCalendar.setTimeInMillis(calendar.getTimeInMillis() + 1000);
calendar = newCalendar;
@@ -171,6 +186,17 @@ public class CronPattern {
return nextMatch(calendar);
}
/**
* 从指定时间戳开始,返回匹配到的下一个时间,如果当前时间匹配,直接返回
*
* @param millis 时间戳
* @return 匹配到的下一个时间时间戳
* @since 7.0.0
*/
public long nextMatch(long millis) {
return nextMatch(CalendarUtil.calendar(millis)).getTimeInMillis();
}
/**
* 返回匹配到的下一个时间,如果给定时间匹配,直接返回
*
@@ -183,10 +209,12 @@ public class CronPattern {
return next;
}
// 当定义周时,可能获取到的下一个日期的周并不匹配,因此天+1直到指定周匹配
next.set(Calendar.DAY_OF_MONTH, next.get(Calendar.DAY_OF_MONTH) + 1);
next = CalendarUtil.beginOfDay(next);
return nextMatch(next);
}
// endregion
@Override
public boolean equals(final Object o) {
@@ -235,9 +263,9 @@ public class CronPattern {
private Calendar nextMatchAfter(final int[] values, final TimeZone zone) {
Calendar minMatch = null;
for (final PatternMatcher matcher : matchers) {
if(null == minMatch){
if (null == minMatch) {
minMatch = matcher.nextMatchAfter(values, zone);
}else{
} else {
minMatch = CompareUtil.min(minMatch, matcher.nextMatchAfter(values, zone));
}
}

View File

@@ -28,7 +28,7 @@ public class TaskTableTest {
@Test
public void taskTableTest(){
final TaskTable taskTable = new TaskTable();
final TaskTable taskTable = new MatchTaskTable();
taskTable.add(IdUtil.fastUUID(), new CronPattern("*/10 * * * * *"), ()-> Console.log("Task 1"));
taskTable.add(IdUtil.fastUUID(), new CronPattern("*/20 * * * * *"), ()-> Console.log("Task 2"));
taskTable.add(IdUtil.fastUUID(), new CronPattern("*/30 * * * * *"), ()-> Console.log("Task 3"));

View File

@@ -0,0 +1,17 @@
package cn.hutool.v7.cron.demo;
import cn.hutool.v7.core.lang.Console;
import cn.hutool.v7.cron.CronConfig;
import cn.hutool.v7.cron.Scheduler;
public class TriggerQueueTest {
public static void main(String[] args) {
final Scheduler scheduler = new Scheduler(CronConfig.of().setMatchSecond(true).setUseTriggerQueue(true));
scheduler.schedule("*/10 * * * * *",
() -> Console.log("Hutool task */10 running at: [{}]", System.currentTimeMillis()));
scheduler.schedule("*/3 * * * * *",
() -> Console.log("Hutool task */3 running at: [{}]", System.currentTimeMillis()));
scheduler.start();
}
}