「干掉定時(shí)任務(wù)!」Spring Boot + DelayQueue 手把手教你玩轉(zhuǎn)分布式延時(shí)隊(duì)列!
為什么我們不再依賴定時(shí)任務(wù)
在很多分布式業(yè)務(wù)系統(tǒng)中,延時(shí)任務(wù)(例如訂單超時(shí)取消、活動(dòng)自動(dòng)結(jié)束、消息狀態(tài)回調(diào)等)是一種剛需功能。 過去,開發(fā)者通常使用 @Scheduled 或者 Quartz 等定時(shí)任務(wù)框架實(shí)現(xiàn),但它們存在幾個(gè)痛點(diǎn):
- 任務(wù)分配不均:多節(jié)點(diǎn)部署后,容易出現(xiàn)任務(wù)重復(fù)執(zhí)行或遺漏執(zhí)行;
- 缺乏靈活性:修改任務(wù)周期需重啟服務(wù);
- 高可用困難:單節(jié)點(diǎn)宕機(jī)導(dǎo)致任務(wù)中斷;
- 時(shí)間精度受限:無法保證“準(zhǔn)點(diǎn)”觸發(fā)。
本文將手把手帶你使用 Spring Boot + DelayQueue 實(shí)現(xiàn)一個(gè)高性能、高可用、可橫向擴(kuò)展的 分布式延時(shí)任務(wù)系統(tǒng)。 通過該方案,我們實(shí)現(xiàn)三大目標(biāo):
不丟失、不重復(fù)、 要準(zhǔn)時(shí)。
系統(tǒng)總體設(shè)計(jì)
整體組件架構(gòu)分為四個(gè)核心模塊
- 協(xié)調(diào)服務(wù)(Coordinator) 負(fù)責(zé)節(jié)點(diǎn)注冊(cè)與發(fā)現(xiàn)、心跳?;?、健康監(jiān)測(cè)與任務(wù)轉(zhuǎn)移。
- 任務(wù)存儲(chǔ)(TaskStorage) 負(fù)責(zé)任務(wù)數(shù)據(jù)的持久化與執(zhí)行狀態(tài)追蹤。
- 任務(wù)執(zhí)行器(DelayTaskExecutor) 執(zhí)行業(yè)務(wù)邏輯的回調(diào)接口。
- 分布式延時(shí)隊(duì)列(DistributedDelayQueue) 系統(tǒng)的統(tǒng)一入口,封裝任務(wù)分配、調(diào)度、執(zhí)行與節(jié)點(diǎn)管理。
系統(tǒng)的核心目標(biāo)與消息隊(duì)列一致:
不丟、不重、不堆積 —— 對(duì)應(yīng)延時(shí)隊(duì)列的三要素:穩(wěn)定、精確、可擴(kuò)展。
代碼實(shí)現(xiàn)
pom.xml 與 application.yml
文件:/pom.xml
<!-- 只列核心依賴,按需補(bǔ)充版本 -->
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
<modelVersion>4.0.0</modelVersion>
<groupId>com.icoderoad</groupId>
<artifactId>distributed-delay-queue</artifactId>
<version>1.0.0</version>
<properties>
<java.version>17</java.version>
<spring.boot.version>3.1.0</spring.boot.version>
</properties>
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.0</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- For logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- maven compiler etc. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>文件:/src/main/resources/application.yml
server:
port: 8080
spring:
redis:
host: localhost
port: 6379
# 如需密碼請(qǐng)配置
# password: your_password
ptc:
delay:
heartbeat-period: 30 # 心跳周期(s)
pull-initial-delay: 0 # 拉取數(shù)據(jù)延時(shí)啟動(dòng)(s)
pull-period: 600 # 拉取周期(s) 默認(rèn)10分鐘
health-initial-delay: 120 # 健康檢查延時(shí)啟動(dòng)(s)
health-period: 180 # 健康檢查周期(s)
remove-initial-delay: 3600 # 已執(zhí)行記錄刪除延遲(默認(rèn)1小時(shí))
remove-period: 3600 # 已執(zhí)行記錄刪除周期常量與屬性
文件:/src/main/java/com/icoderoad/delayqueue/constants/DelayConstant.java
package com.icoderoad.delayqueue.constants;
/**
* DelayQueue 相關(guān)常量
*/
public class DelayConstant {
public static final String NODES_KEY = "ptc:nodes";
public static final String NODE_HEARTBEAT_KEY_PREFIX = "ptc:heartbeat:";
public static final String NODES_HEALTH_KEY = "ptc:nodes:health";
public static final String DELAY_TASK_KEY_PREFIX = "ptc:delay:task:";
public static final String DELAY_EXECUTED_KEY = "ptc:delay:executed";
public static final String DELAY_EXECUTING_KEY_PREFIX = "ptc:delay:executing:";
}文件:/src/main/java/com/icoderoad/delayqueue/config/DelayProperties.java
package com.icoderoad.delayqueue.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 可配置屬性
*/
@Data
@ConfigurationProperties(prefix = "ptc.delay")
public class DelayProperties {
private Integer heartbeatPeriod = 30;
private Integer pullInitialDelay = 0;
private Integer pullPeriod = 10 * 60;
private Integer healthInitialDelay = 2 * 60;
private Integer healthPeriod = 3 * 60;
private Integer removeInitialDelay = 60 * 60;
private Integer removePeriod = 60 * 60;
}實(shí)體類:DelayTask
文件:/src/main/java/com/icoderoad/delayqueue/core/DelayTask.java
package com.icoderoad.delayqueue.core;
import lombok.Data;
import org.springframework.lang.NonNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延時(shí)任務(wù)實(shí)體 - 實(shí)現(xiàn) JDK Delayed 接口以支持 DelayQueue
*/
@Data
public class DelayTask implements Delayed {
private final String queueName;
private final String taskId;
private final long executeTime;
public DelayTask(String queueName, String taskId, long executeTime) {
this.queueName = queueName;
this.taskId = taskId;
this.executeTime = executeTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NonNull Delayed o) {
return Long.compare(this.executeTime, ((DelayTask) o).getExecuteTime());
}
public String queueTaskId() {
return this.queueName + ":" + this.taskId;
}
}執(zhí)行器接口
文件:/src/main/java/com/icoderoad/delayqueue/executor/DelayTaskExecutor.java
package com.icoderoad.delayqueue.executor;
import com.icoderoad.delayqueue.core.DelayTask;
/**
* 延時(shí)任務(wù)業(yè)務(wù)執(zhí)行器接口
*/
public interface DelayTaskExecutor {
void run(DelayTask delayedTask);
String queueName();
}存儲(chǔ)接口與 Redis 實(shí)現(xiàn)
文件:/src/main/java/com/icoderoad/delayqueue/storage/TaskStorage.java
package com.icoderoad.delayqueue.storage;
import com.icoderoad.delayqueue.core.DelayTask;
import java.util.List;
/**
* 任務(wù)存儲(chǔ)接口 - 支持多種后端實(shí)現(xiàn)
*/
public interface TaskStorage {
void addTask(DelayTask task);
void removeTask(DelayTask task);
List<DelayTask> listTask(String queueName, Long startTime, Long endTime);
void addExecutedTask(DelayTask task);
boolean isExecuted(DelayTask task);
void removeExecutedTask(Long startTime, Long endTime);
}文件:/src/main/java/com/icoderoad/delayqueue/storage/impl/RedisTaskStorage.java
package com.icoderoad.delayqueue.storage.impl;
import com.icoderoad.delayqueue.constants.DelayConstant;
import com.icoderoad.delayqueue.core.DelayTask;
import com.icoderoad.delayqueue.storage.TaskStorage;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
* Redis 實(shí)現(xiàn) - 使用 ZSet 存儲(chǔ)任務(wù) (score = executeTime)
*/
public class RedisTaskStorage implements TaskStorage {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addTask(DelayTask task) {
stringRedisTemplate.opsForZSet().add(DelayConstant.DELAY_TASK_KEY_PREFIX + task.getQueueName(),
task.getTaskId(), task.getExecuteTime());
}
@Override
public void removeTask(DelayTask task) {
stringRedisTemplate.opsForZSet().remove(DelayConstant.DELAY_TASK_KEY_PREFIX + task.getQueueName(),
task.getTaskId());
}
@Override
public List<DelayTask> listTask(String queueName, Long startTime, Long endTime) {
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.rangeByScoreWithScores(DelayConstant.DELAY_TASK_KEY_PREFIX + queueName, startTime, endTime);
List<DelayTask> result = new ArrayList<>();
if (CollectionUtils.isEmpty(tuples)) {
return result;
}
for (ZSetOperations.TypedTuple<String> t : tuples) {
Double score = t.getScore();
if (score == null) continue;
result.add(new DelayTask(queueName, t.getValue(), score.longValue()));
}
return result;
}
@Override
public void addExecutedTask(DelayTask task) {
stringRedisTemplate.opsForZSet().add(DelayConstant.DELAY_EXECUTED_KEY, task.queueTaskId(), System.currentTimeMillis());
}
@Override
public boolean isExecuted(DelayTask task) {
Double score = stringRedisTemplate.opsForZSet().score(DelayConstant.DELAY_EXECUTED_KEY, task.queueTaskId());
if (score == null) return false;
return task.getExecuteTime() <= score.longValue();
}
@Override
public void removeExecutedTask(Long startTime, Long endTime) {
stringRedisTemplate.opsForZSet().removeRangeByScore(DelayConstant.DELAY_EXECUTED_KEY, startTime, endTime);
}
}(Coordinator)接口與 Redis 實(shí)現(xiàn)
文件:/src/main/java/com/icoderoad/delayqueue/cluster/Coordinator.java
package com.icoderoad.delayqueue.cluster;
import java.util.List;
import java.util.function.BiConsumer;
/**
* 協(xié)調(diào)器接口:節(jié)點(diǎn)注冊(cè)、心跳、健康檢查、獲取活躍節(jié)點(diǎn)等
*/
public interface Coordinator {
String registerNode();
void unRegisterNode(String nodeId);
List<String> getActiveNodes();
void heartBeat(String nodeId);
void checkClusterHealth(BiConsumer<List<String>, List<String>> consumer);
}文件:/src/main/java/com/icoderoad/delayqueue/cluster/impl/RedisCoordinator.java
package com.icoderoad.delayqueue.cluster.impl;
import com.icoderoad.delayqueue.cluster.Coordinator;
import com.icoderoad.delayqueue.constants.DelayConstant;
import com.icoderoad.delayqueue.config.DelayProperties;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* 基于 Redis 的協(xié)調(diào)器實(shí)現(xiàn)
*/
public class RedisCoordinator implements Coordinator {
private static final Logger logger = LoggerFactory.getLogger(RedisCoordinator.class);
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
@Resource
private DelayProperties delayProperties;
private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
@Override
public String registerNode() {
String nodeId = UUID.randomUUID().toString().replace("-", "");
stringRedisTemplate.opsForList().rightPush(DelayConstant.NODES_KEY, nodeId);
setNodeHeartbeat(nodeId);
return nodeId;
}
@Override
public void unRegisterNode(String nodeId) {
stringRedisTemplate.opsForList().remove(DelayConstant.NODES_KEY, 1, nodeId);
stringRedisTemplate.delete(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + nodeId);
heartbeatExecutor.shutdownNow();
}
@Override
public List<String> getActiveNodes() {
List<String> nodes = stringRedisTemplate.opsForList().range(DelayConstant.NODES_KEY, 0, -1);
if (CollectionUtils.isEmpty(nodes)) return new ArrayList<>();
List<String> active = new ArrayList<>();
for (String node : nodes) {
Boolean exist = stringRedisTemplate.hasKey(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + node);
if (Boolean.TRUE.equals(exist)) active.add(node);
}
return active;
}
@Override
public void heartBeat(String nodeId) {
heartbeatExecutor.scheduleAtFixedRate(() -> setNodeHeartbeat(nodeId),
delayProperties.getHeartbeatPeriod(), delayProperties.getHeartbeatPeriod(), TimeUnit.SECONDS);
}
@Override
public void checkClusterHealth(BiConsumer<List<String>, List<String>> consumer) {
RLock lock = redissonClient.getLock(DelayConstant.NODES_HEALTH_KEY);
boolean locked = false;
try {
locked = lock.tryLock();
if (!locked) return;
List<String> nodes = stringRedisTemplate.opsForList().range(DelayConstant.NODES_KEY, 0, -1);
if (CollectionUtils.isEmpty(nodes)) return;
List<String> dead = new ArrayList<>();
for (String node : nodes) {
Boolean exist = stringRedisTemplate.hasKey(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + node);
if (!Boolean.TRUE.equals(exist)) dead.add(node);
}
if (!dead.isEmpty()) {
// 從列表中移除 dead 節(jié)點(diǎn)(逐個(gè)刪除以保持原子性)
dead.forEach(node -> stringRedisTemplate.opsForList().remove(DelayConstant.NODES_KEY, 1, node));
// callback: 讓調(diào)用者處理任務(wù)遷移
consumer.accept(nodes, dead);
}
} catch (Exception e) {
logger.error("checkClusterHealth error", e);
} finally {
if (locked) {
try { lock.unlock(); } catch (Exception ignored) {}
}
}
}
private void setNodeHeartbeat(String nodeId) {
stringRedisTemplate.opsForValue().set(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + nodeId,
"1", delayProperties.getHeartbeatPeriod() * 2, TimeUnit.SECONDS);
}
}線程池包裝(PlasticeneThreadExecutor)
文件:/src/main/java/com/icoderoad/delayqueue/util/PlasticeneThreadExecutor.java
package com.icoderoad.delayqueue.util;
import java.util.concurrent.*;
/**
* 簡易線程池包裝,作為工作線程池
*/
public class PlasticeneThreadExecutor {
private final ThreadPoolExecutor executor;
public PlasticeneThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveMillis, String threadNamePrefix) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10000);
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setName(threadNamePrefix + "-" + t.getId());
t.setDaemon(false);
return t;
};
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveMillis, TimeUnit.MILLISECONDS, queue, factory,
new ThreadPoolExecutor.CallerRunsPolicy());
}
public void submit(Runnable task) {
executor.submit(task);
}
public void shutdown() {
executor.shutdown();
}
}分布式延時(shí)隊(duì)列核心:DistributedDelayQueue
文件:/src/main/java/com/icoderoad/delayqueue/core/DistributedDelayQueue.java
這是系統(tǒng)的核心,邏輯較長 — 我將完整給出,并在方法前添加注釋以便理解。
package com.icoderoad.delayqueue.core;
import com.icoderoad.delayqueue.cluster.Coordinator;
import com.icoderoad.delayqueue.config.DelayProperties;
import com.icoderoad.delayqueue.constants.DelayConstant;
import com.icoderoad.delayqueue.executor.DelayTaskExecutor;
import com.icoderoad.delayqueue.storage.TaskStorage;
import com.icoderoad.delayqueue.util.PlasticeneThreadExecutor;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* 分布式延時(shí)隊(duì)列核心實(shí)現(xiàn)
*/
public class DistributedDelayQueue implements InitializingBean, ApplicationListener<ContextClosedEvent> {
private static final Logger logger = LoggerFactory.getLogger(DistributedDelayQueue.class);
@Resource
private Coordinator coordinator;
@Resource
private TaskStorage taskStorage;
@Resource
private RedissonClient redissonClient;
@Resource
private DelayProperties delayProperties;
// 可選注入的業(yè)務(wù)執(zhí)行器
@Autowired(required = false)
private List<DelayTaskExecutor> taskExecutors;
// 隊(duì)列名稱 -> DelayQueue
private final ConcurrentMap<String, DelayQueue<DelayTask>> delayMap = new ConcurrentHashMap<>();
private final Set<String> queueNameSet = new HashSet<>();
private final Set<String> taskIdSet = new HashSet<>(); // 本節(jié)點(diǎn)已加載任務(wù) id
private String nodeId;
private final AtomicBoolean running = new AtomicBoolean(false);
private final ScheduledExecutorService loadExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r); t.setName("ddq-load-executor"); return t;
});
private final ScheduledExecutorService runningExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r); t.setName("ddq-running-executor"); return t;
});
private final ScheduledExecutorService healthExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r); t.setName("ddq-health-executor"); return t;
});
private final ScheduledExecutorService removeExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r); t.setName("ddq-remove-executor"); return t;
});
private final PlasticeneThreadExecutor workExecutor = new PlasticeneThreadExecutor(
Runtime.getRuntime().availableProcessors() + 1,
Runtime.getRuntime().availableProcessors() * 5,
1000,
"delay-consumer"
);
/**
* 添加延遲任務(wù)(業(yè)務(wù)入口)
* 1) 若該任務(wù)在下一次拉取周期內(nèi)會(huì)到期,則直接放入本地 DelayQueue 以保證準(zhǔn)時(shí);
* 2) 同時(shí)持久化到存儲(chǔ)(Redis 等)
*/
public void addTask(DelayTask delayTask) {
long executeTime = delayTask.getExecuteTime();
long afterTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayProperties.getPullPeriod());
if (afterTime > executeTime) {
offerTask(delayTask);
}
taskStorage.addTask(delayTask);
}
@Override
public void afterPropertiesSet() {
if (CollectionUtils.isEmpty(taskExecutors)) {
// 如果沒有任何業(yè)務(wù)執(zhí)行器,組件無需初始化
logger.warn("no DelayTaskExecutor found, DistributedDelayQueue will not start.");
return;
}
// 收集隊(duì)列名
taskExecutors.forEach(executor -> queueNameSet.add(executor.queueName()));
// 注冊(cè)節(jié)點(diǎn)并啟動(dòng)心跳
this.nodeId = coordinator.registerNode();
coordinator.heartBeat(this.nodeId);
// 啟動(dòng)工作線程(掃描本地 DelayQueues)
running.set(true);
runningExecutor.submit(() -> {
while (running.get()) {
try {
for (Map.Entry<String, DelayQueue<DelayTask>> entry : delayMap.entrySet()) {
DelayQueue<DelayTask> dq = entry.getValue();
DelayTask task = dq.poll(1, TimeUnit.SECONDS);
executeTask(task);
}
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
running.set(false);
Thread.currentThread().interrupt();
logger.error("DistributedDelayQueue worker interrupted", e);
} catch (Exception e) {
logger.error("DistributedDelayQueue worker error", e);
}
}
});
// 啟動(dòng)定時(shí)拉取任務(wù)(拉取未來 pullPeriod 時(shí)間內(nèi)的任務(wù))
loadExecutor.scheduleAtFixedRate(this::loadTask,
delayProperties.getPullInitialDelay(),
delayProperties.getPullPeriod(),
TimeUnit.SECONDS);
// 健康檢查:檢測(cè)下線節(jié)點(diǎn)并觸發(fā)任務(wù)遷移
healthExecutor.scheduleWithFixedDelay(() ->
coordinator.checkClusterHealth(this::moveOfflineNodeTask),
delayProperties.getHealthInitialDelay(),
delayProperties.getHealthPeriod(),
TimeUnit.SECONDS);
// 定期刪除已執(zhí)行的歷史數(shù)據(jù)
removeExecutor.scheduleAtFixedRate(() -> {
long endTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(delayProperties.getRemovePeriod());
taskStorage.removeExecutedTask(0L, endTime);
}, delayProperties.getRemoveInitialDelay(),
delayProperties.getRemovePeriod(), TimeUnit.SECONDS);
logger.info("DistributedDelayQueue initialized: nodeId={}", this.nodeId);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
running.set(false);
// 注銷節(jié)點(diǎn),優(yōu)雅下線
try {
coordinator.unRegisterNode(this.nodeId);
} catch (Exception e) {
logger.warn("unregister node failed", e);
}
healthExecutor.shutdownNow();
loadExecutor.shutdownNow();
runningExecutor.shutdownNow();
workExecutor.shutdown();
removeExecutor.shutdownNow();
delayMap.clear();
taskIdSet.clear();
queueNameSet.clear();
logger.info("DistributedDelayQueue destroyed: nodeId={}", this.nodeId);
}
/**
* 執(zhí)行任務(wù):用線程池異步執(zhí)行,執(zhí)行前通過分布式鎖保證冪等性
*/
private void executeTask(DelayTask task) {
if (task == null) return;
workExecutor.submit(() -> {
taskIdSet.remove(task.queueTaskId());
RLock lock = redissonClient.getLock(DelayConstant.DELAY_EXECUTING_KEY_PREFIX + task.queueTaskId());
boolean locked = false;
try {
locked = lock.tryLock();
if (!locked) return;
// 從存儲(chǔ)中移除(刪除原始任務(wù))
taskStorage.removeTask(task);
// 檢查是否執(zhí)行過
if (taskStorage.isExecuted(task)) {
logger.info("task already executed: {}", task.queueTaskId());
return;
}
// 先記錄執(zhí)行過
taskStorage.addExecutedTask(task);
// 找到對(duì)應(yīng)的 executor 并執(zhí)行
for (DelayTaskExecutor executor : taskExecutors) {
if (Objects.equals(executor.queueName(), task.getQueueName())) {
try {
executor.run(task);
} catch (Exception e) {
// 執(zhí)行業(yè)務(wù)異常:可加重試、告警等機(jī)制(此處只記錄)
logger.error("business executor error for task: {}", task.queueTaskId(), e);
}
}
}
logger.info("task execute success: {}", task.queueTaskId());
} catch (Exception e) {
logger.error("executeTask error: {}", task.queueTaskId(), e);
} finally {
if (locked) {
try { lock.unlock(); } catch (Exception ignored) {}
}
}
});
}
/**
* 當(dāng)檢測(cè)到節(jié)點(diǎn)下線時(shí),將下線節(jié)點(diǎn)負(fù)責(zé)的任務(wù)遷移到本節(jié)點(diǎn)(或其它活躍節(jié)點(diǎn))
*/
private void moveOfflineNodeTask(List<String> originNodes, List<String> deadNodes) {
if (!running.get()) return;
if (CollectionUtils.isEmpty(originNodes) || CollectionUtils.isEmpty(deadNodes)) return;
long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayProperties.getPullPeriod());
// 對(duì)于每個(gè)下線節(jié)點(diǎn),掃描各業(yè)務(wù)隊(duì)列里執(zhí)行時(shí)間在 [0, end] 的任務(wù),按照哈希判斷是否曾經(jīng)分配到該下線節(jié)點(diǎn)
for (String deadNode : deadNodes) {
for (String queueName : queueNameSet) {
List<DelayTask> tasks = taskStorage.listTask(queueName, 0L, end);
for (DelayTask t : tasks) {
int hash = Math.abs(t.getTaskId().hashCode());
int index = hash % originNodes.size();
if (Objects.equals(originNodes.get(index), deadNode)) {
offerTask(t);
}
}
}
}
}
/**
* 周期性從存儲(chǔ)中加載未來一段時(shí)間內(nèi)將到期的任務(wù),按 hash 分片分配到各節(jié)點(diǎn)
*/
private void loadTask() {
if (!running.get()) return;
logger.info("DistributedDelayQueue loadTask start: nodeId={}", this.nodeId);
List<String> activeNodes = coordinator.getActiveNodes();
if (CollectionUtils.isEmpty(activeNodes)) {
logger.warn("no active nodes found during loadTask");
return;
}
long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayProperties.getPullPeriod());
for (String queueName : queueNameSet) {
List<DelayTask> tasks = taskStorage.listTask(queueName, 0L, end);
if (tasks == null || tasks.isEmpty()) continue;
for (DelayTask t : tasks) {
if (canProcess(t, activeNodes)) {
offerTask(t);
}
}
}
}
private Boolean canProcess(DelayTask task, List<String> activeNodes) {
if (!running.get()) return false;
if (taskIdSet.contains(task.queueTaskId())) return false;
if (CollectionUtils.isEmpty(activeNodes)) return false;
int hash = Math.abs(task.getTaskId().hashCode());
int index = hash % activeNodes.size();
return activeNodes.get(index).equals(this.nodeId);
}
private void offerTask(DelayTask task) {
DelayQueue<DelayTask> dq = delayMap.computeIfAbsent(task.getQueueName(), k -> new DelayQueue<>());
dq.offer(task);
taskIdSet.add(task.queueTaskId());
logger.info("task offered: {} to node {}", task.queueTaskId(), this.nodeId);
}
}自動(dòng)配置類(Spring Boot 自動(dòng)注冊(cè) Bean)
文件:/src/main/java/com/icoderoad/delayqueue/autoconfig/DelayAutoConfiguration.java
package com.icoderoad.delayqueue.autoconfig;
import com.icoderoad.delayqueue.cluster.Coordinator;
import com.icoderoad.delayqueue.cluster.impl.RedisCoordinator;
import com.icoderoad.delayqueue.config.DelayProperties;
import com.icoderoad.delayqueue.core.DistributedDelayQueue;
import com.icoderoad.delayqueue.storage.TaskStorage;
import com.icoderoad.delayqueue.storage.impl.RedisTaskStorage;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 自動(dòng)配置:當(dāng)用戶未提供自定義實(shí)現(xiàn)時(shí),使用默認(rèn) Redis 實(shí)現(xiàn)
*/
@Configuration
@EnableConfigurationProperties(DelayProperties.class)
public class DelayAutoConfiguration {
@Bean
@ConditionalOnMissingBean(Coordinator.class)
public Coordinator coordinator() {
return new RedisCoordinator();
}
@Bean
@ConditionalOnMissingBean(TaskStorage.class)
public TaskStorage taskStorage() {
return new RedisTaskStorage();
}
@Bean
public DistributedDelayQueue distributedDelayQueue() {
return new DistributedDelayQueue();
}
}業(yè)務(wù)示例:訂單延時(shí)任務(wù) Executor
文件:/src/main/java/com/icoderoad/delayqueue/example/OrderTaskExecutor.java
package com.icoderoad.delayqueue.example;
import com.icoderoad.delayqueue.core.DelayTask;
import com.icoderoad.delayqueue.executor.DelayTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 訂單業(yè)務(wù)延時(shí)任務(wù)示例
*/
@Component
@Slf4j
public class OrderTaskExecutor implements DelayTaskExecutor {
@Override
public void run(DelayTask delayedTask) {
// 在這里實(shí)現(xiàn)訂單超時(shí)處理等業(yè)務(wù)
log.info("OrderTaskExecutor running task: {}", delayedTask);
// TODO: 調(diào)用訂單服務(wù),取消未支付訂單等
}
@Override
public String queueName() {
return "order";
}
}控制器示例:添加延時(shí)任務(wù)接口
文件:/src/main/java/com/icoderoad/delayqueue/example/DelayController.java
package com.icoderoad.delayqueue.example;
import com.icoderoad.delayqueue.core.DelayTask;
import com.icoderoad.delayqueue.core.DistributedDelayQueue;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 簡單示例接口:用于添加延時(shí)任務(wù)
*/
@RestController
@RequestMapping("/delay")
public class DelayController {
@Resource
private DistributedDelayQueue distributedDelayQueue;
@PostMapping("/add-order")
public String addOrderDelayTask(String id, int minutes) {
long executeTime = System.currentTimeMillis() + minutes * 60L * 1000L;
DelayTask task = new DelayTask("order", id, executeTime);
distributedDelayQueue.addTask(task);
return "ok";
}
}啟動(dòng)類
文件:/src/main/java/com/icoderoad/delayqueue/DelayQueueApplication.java
package com.icoderoad.delayqueue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 啟動(dòng)類
*/
@SpringBootApplication(scanBasePackages = "com.icoderoad")
public class DelayQueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayQueueApplication.class, args);
}
}部署與驗(yàn)證
- 準(zhǔn)備依賴:確保 Redis 可用,Redisson 配置正確(可在 application.yml 中配置 spring.redis,Redisson 自動(dòng)從 Spring Redis 連接中讀取或使用 Redisson 配置)。
- 啟動(dòng)兩個(gè)實(shí)例(或多個(gè)):
- 在不同端口啟動(dòng)兩份服務(wù)(例如 --server.port=8080 和 --server.port=8081),觀察日志會(huì)打印注冊(cè)節(jié)點(diǎn)、心跳、拉取任務(wù)、任務(wù)分配等信息。
- 添加任務(wù):
- 通過 POST /delay/add-order?id=order001&minutes=3 添加一個(gè) 3 分鐘后執(zhí)行任務(wù)。
- 若任務(wù)的 executeTime 在下一次拉取周期內(nèi),會(huì)被立即放入本地 DelayQueue 以保證準(zhǔn)時(shí)觸發(fā)。
- 驗(yàn)證不重復(fù):
- 當(dāng)兩個(gè)節(jié)點(diǎn)都加載了同一任務(wù)時(shí),executeTask 通過 Redisson 分布式鎖(DELAY_EXECUTING_KEY_PREFIX + queueTaskId)保證只有一個(gè)節(jié)點(diǎn)能拿到鎖并執(zhí)行。
- executed 集合記錄執(zhí)行時(shí)間,若任務(wù)再次入隊(duì)(原因是業(yè)務(wù)修改執(zhí)行時(shí)間等),會(huì)根據(jù)記錄決定是否需要再次執(zhí)行。
- 節(jié)點(diǎn)下線重平衡:
- 啟動(dòng)健康檢查后,如果某節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)會(huì)檢測(cè)到并將下線節(jié)點(diǎn)負(fù)責(zé)的任務(wù)遷移到自己(哈希分配判斷)。
- 生產(chǎn)事項(xiàng):
- 為避免 Redis 單點(diǎn),使用 Redis 集群或主從 + 哨兵方案。
- 可在 DelayAutoConfiguration 中替換 TaskStorage 為 MySQL / RocksDB 的實(shí)現(xiàn)以滿足海量數(shù)據(jù)場(chǎng)景。
- 增加監(jiān)控(Prometheus)與告警(釘釘、企業(yè)微信)以便運(yùn)維。
結(jié)論與展望
我們重構(gòu)并完整實(shí)現(xiàn)了一個(gè)基于 Spring Boot + DelayQueue 的分布式延時(shí)任務(wù)組件,滿足生產(chǎn)級(jí)別的三個(gè)核心要求:不丟失、不重復(fù)、準(zhǔn)時(shí)觸發(fā)。實(shí)現(xiàn)要點(diǎn)包括:
- 本地 DelayQueue + 周期性拉取:既保證實(shí)時(shí)性,也避免內(nèi)存堆積。
- 持久化存儲(chǔ)(Redis ZSet):保證任務(wù)不會(huì)因服務(wù)重啟而丟失。
- 協(xié)調(diào)服務(wù) + 健康檢查:保證節(jié)點(diǎn)注冊(cè)、心跳續(xù)期與任務(wù)自動(dòng)遷移。
- 分布式鎖與執(zhí)行記錄:保證同一任務(wù)不會(huì)被多個(gè)節(jié)點(diǎn)重復(fù)執(zhí)行,并支持冪等校驗(yàn)。
- 優(yōu)雅啟停與資源回收:支持線上擴(kuò)縮容和節(jié)點(diǎn)替換。
后續(xù)可做方向
- 監(jiān)控與可視化(高優(yōu)先級(jí))
- 集成 Prometheus + Grafana,采集:待處理任務(wù)數(shù)、執(zhí)行速率、失敗率、節(jié)點(diǎn)數(shù)、延遲分布。
- 開發(fā)簡單的任務(wù)管理 UI(Thymeleaf + Bootstrap),支持手動(dòng)重試、取消與詳情查看。
- 存儲(chǔ)層擴(kuò)展(中優(yōu)先級(jí))
- 為海量任務(wù)提供分表 / 分區(qū)策略或持久化到 RocksDB、MySQL 分庫等方案。
- 支持任務(wù)分頁查詢、歷史保留策略。
- 高級(jí)調(diào)度策略(中/低優(yōu)先級(jí))
- 支持優(yōu)先級(jí)隊(duì)列、延時(shí)任務(wù)批量分發(fā)、延遲任務(wù)合并與分片算法優(yōu)化。
- 故障演練(高優(yōu)先級(jí))
- 定期進(jìn)行 Chaos 測(cè)試(節(jié)點(diǎn)離線、網(wǎng)絡(luò)抖動(dòng)、Redis 宕機(jī))以驗(yàn)證邊界場(chǎng)景處理。






























