輕量分布式定時任務:Spring Boot 時間輪方案
引言
在分布式系統中,定時任務是常見的需求,比如數據同步、訂單超時處理、日志清理等。傳統的定時任務方案如Quartz雖然功能強大,但在高并發、輕量級場景下,存在配置復雜、資源占用較高的問題。而Spring Boot自帶的@Scheduled注解僅適用于單機環境,無法滿足分布式部署的需求。
時間輪算法是什么
時間輪(Time Wheel)是一種高效的定時任務調度模型,靈感來源于時鐘的輪盤結構。它將時間劃分為多個槽位(Slot),每個槽位對應一個時間區間(如1秒、10秒),并維護一個任務鏈表。
- 單級時間輪:類似鐘表,有多個槽位(如
60個,對應60秒),指針每秒轉動一次,指向當前需要執行的槽位,然后執行該槽位下的所有任務。 - 多級時間輪:當任務的延遲時間超過單級時間輪的最大范圍時(如單級最大
60秒,任務延遲1小時),會通過級聯機制,將任務先放入高級時間輪,當時間接近時,再降級到低級時間輪,最終觸發執行。
時間輪的優勢在于:
O (1)時間復雜度:任務的添加、刪除、觸發均為常數時間,效率遠高于定時器鏈表(O (n))。- 低資源占用:通過批量觸發減少線程喚醒頻率,降低
CPU消耗。
實現
整體架構
本方案的架構分為三層,自上而下分別是:
- 任務提交層:提供
API接口,接收業務系統提交的定時任務(如延遲10秒執行訂單超時處理)。 - 時間輪調度層:基于
Netty HashedWheelTimer實現,負責按時間觸發任務,并通過Redis鎖保證分布式唯一性。 - 任務執行層:接收調度層的觸發信號,執行具體的業務邏輯(如調用訂單關閉接口)。
[業務系統] → [任務提交API] → [Redis存儲任務] → [時間輪調度器] → [Redis分布式鎖] → [任務執行器]時間輪參數設計
以Netty HashedWheelTimer為例,核心參數需根據業務場景調整:
tickDuration:時間輪指針轉動一次的時間(如1秒),決定任務觸發的精度。ticksPerWheel:時間輪的槽位數量(如60),單級時間輪的最大覆蓋時間 =tickDuration × ticksPerWheel(如60秒)。threadFactory:線程工廠,用于創建時間輪的工作線程。
示例:
// 時間輪配置:1秒/ tick,60個槽位,最大覆蓋60秒
HashedWheelTimer timer = new HashedWheelTimer(
Executors.defaultThreadFactory(), // 線程工廠
Duration.ofSeconds(1), // 每1秒轉動一次
TimeUnit.SECONDS,
60 // 60個槽位
);核心依賴
<!-- Redis 與 Redisson(分布式鎖) -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.6</version>
</dependency>
<!-- Netty 時間輪(高性能) -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.82.Final</version>
</dependency>
<!-- 工具類 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>分布式鎖工具類(Redisson)
@Component
public class RedissonLockUtil {
@Resource
private RedissonClient redissonClient;
/**
* 嘗試獲取分布式鎖
* @param lockKey 鎖的key(建議用任務ID)
* @param waitTime 等待時間(毫秒)
* @param leaseTime 鎖的自動釋放時間(毫秒)
* @return 是否獲取成功
*/
public boolean tryLock(String lockKey, long waitTime, long leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
// 嘗試獲取鎖,等待waitTime毫秒,獲取成功后leaseTime毫秒自動釋放
return lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
returnfalse;
}
}
/**
* 釋放分布式鎖
* @param lockKey 鎖的key
*/
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}任務模型與 Redis 存儲
任務模型
@Data
public class ScheduledTask implements Serializable {
// 任務唯一ID(UUID生成)
private String taskId;
// 任務類型(如ORDER_TIMEOUT:訂單超時)
private String taskType;
// 任務參數(JSON格式,如{"orderId": "123456"})
private String taskParam;
// 任務延遲時間(毫秒)
private long delayMs;
// 任務提交時間
private Date submitTime;
// 任務預計執行時間
private Date executeTime;
}Redis 存儲工具類
@Component
public class TaskRedisUtil {
@Resource
private StringRedisTemplate stringRedisTemplate;
// Redis key前綴(避免key沖突)
private static final String TASK_KEY_PREFIX = "scheduled:task:";
/**
* 存儲任務到Redis
* @param task 任務對象
* @param expireMs 過期時間(毫秒,避免Redis內存溢出)
*/
public void saveTask(ScheduledTask task, long expireMs) {
String key = TASK_KEY_PREFIX + task.getTaskId();
String taskJson = JSONUtil.toJsonStr(task);
stringRedisTemplate.opsForValue().set(key, taskJson, expireMs, TimeUnit.MILLISECONDS);
}
/**
* 從Redis獲取任務
* @param taskId 任務ID
* @return 任務對象(null表示不存在)
*/
public ScheduledTask getTask(String taskId) {
String key = TASK_KEY_PREFIX + taskId;
String taskJson = stringRedisTemplate.opsForValue().get(key);
if (taskJson == null) {
return null;
}
return JSONUtil.toBean(taskJson, ScheduledTask.class);
}
/**
* 從Redis刪除任務(任務執行完成后)
* @param taskId 任務ID
*/
public void deleteTask(String taskId) {
String key = TASK_KEY_PREFIX + taskId;
stringRedisTemplate.delete(key);
}
}時間輪調度器實現
基于Netty HashedWheelTimer實現時間輪調度器,核心邏輯是:
- 接收任務,計算任務的延遲時間。
- 將任務提交到時間輪,設置延遲觸發。
- 任務觸發時,先獲取
Redis分布式鎖,確保唯一執行。 - 執行任務邏輯,執行完成后刪除
Redis中的任務和鎖。
@Component
public class TimeWheelScheduler {
// 時間輪實例(單例)
private HashedWheelTimer hashedWheelTimer;
// 任務執行線程池(避免任務執行阻塞時間輪)
private ExecutorService taskExecutor;
@Resource
private RedissonLockUtil redissonLockUtil;
@Resource
private TaskRedisUtil taskRedisUtil;
@Resource
private TaskExecutor taskExecutorService; // 自定義任務執行器
// 初始化時間輪和線程池
@PostConstruct
public void init() {
// 時間輪配置:1秒/tick,60個槽位,最大覆蓋60秒
hashedWheelTimer = new HashedWheelTimer(
Executors.defaultThreadFactory(),
1000, // tickDuration:1000毫秒(1秒)
TimeUnit.MILLISECONDS,
60 // ticksPerWheel:60個槽位
);
// 任務執行線程池:核心線程數=CPU核心數,最大線程數=2*CPU核心數
int corePoolSize = Runtime.getRuntime().availableProcessors();
taskExecutor = Executors.newFixedThreadPool(corePoolSize * 2);
}
/**
* 提交任務到時間輪
* @param task 定時任務
*/
public void submitTask(ScheduledTask task) {
// 1. 計算任務延遲時間(如果任務已過期,直接執行)
long delayMs = task.getDelayMs();
if (delayMs < 0) {
delayMs = 0;
}
// 2. 存儲任務到Redis(過期時間=延遲時間+30秒,避免任務未執行就被刪除)
taskRedisUtil.saveTask(task, delayMs + 30 * 1000);
// 3. 將任務提交到時間輪
hashedWheelTimer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 4. 任務觸發時,提交到線程池執行(避免阻塞時間輪)
taskExecutor.submit(() -> executeTask(task));
}
}, delayMs, TimeUnit.MILLISECONDS);
}
/**
* 執行任務(帶分布式鎖)
* @param task 定時任務
*/
private void executeTask(ScheduledTask task) {
String taskId = task.getTaskId();
String lockKey = "scheduled:lock:" + taskId;
try {
// 1. 嘗試獲取分布式鎖(等待1秒,自動釋放30秒)
boolean lockSuccess = redissonLockUtil.tryLock(lockKey, 1000, 30 * 1000);
if (!lockSuccess) {
// 鎖獲取失敗,說明其他節點已執行該任務
return;
}
// 2. 再次從Redis獲取任務(避免任務已被刪除)
ScheduledTask redisTask = taskRedisUtil.getTask(taskId);
if (redisTask == null) {
return;
}
// 3. 執行具體的業務邏輯(調用自定義任務執行器)
taskExecutorService.execute(redisTask);
// 4. 任務執行完成,刪除Redis中的任務
taskRedisUtil.deleteTask(taskId);
} catch (Exception e) {
// 任務執行失敗,可記錄日志并進行重試
} finally {
// 5. 釋放分布式鎖
redissonLockUtil.unlock(lockKey);
}
}
// 銷毀時間輪和線程池(應用關閉時)
@PreDestroy
public void destroy() {
if (hashedWheelTimer != null) {
hashedWheelTimer.stop();
}
if (taskExecutor != null) {
taskExecutor.shutdown();
}
}
}任務執行器(業務邏輯)
@Component
public class OrderTimeoutTaskExecutor implements TaskExecutor {
// 任務類型(與ScheduledTask的taskType對應)
public static final String TASK_TYPE = "ORDER_TIMEOUT";
@Override
public void execute(ScheduledTask task) {
// 1. 校驗任務類型
if (!TASK_TYPE.equals(task.getTaskType())) {
return;
}
// 2. 解析任務參數(JSON → Map)
String taskParam = task.getTaskParam();
Map<String, String> paramMap = JSONUtil.toBean(taskParam, Map.class);
String orderId = paramMap.get("orderId");
if (orderId == null) {
return;
}
// 3. 執行訂單超時邏輯(如關閉訂單、退還庫存等)
System.out.println("執行訂單超時處理:orderId=" + orderId);
// TODO: 調用訂單服務的關閉接口
}
}任務執行器工廠(根據任務類型選擇執行器)
@Component
public class TaskExecutorFactory {
// 存儲任務類型與執行器的映射
private final Map<String, TaskExecutor> executorMap = new HashMap<>();
// 注入所有TaskExecutor實現類(Spring會自動收集)
@Resource
public void setExecutorMap(Map<String, TaskExecutor> executorMap) {
this.executorMap.clear();
// 遍歷所有執行器,將“任務類型”作為key存入map
for (TaskExecutor executor : executorMap.values()) {
if (executor instanceof OrderTimeoutTaskExecutor) {
this.executorMap.put(OrderTimeoutTaskExecutor.TASK_TYPE, executor);
}
// TODO: 其他任務類型的執行器(如日志清理)
}
}
/**
* 根據任務類型獲取執行器
* @param taskType 任務類型
* @return 任務執行器(null表示無匹配)
*/
public TaskExecutor getExecutor(String taskType) {
return executorMap.get(taskType);
}
}


































