Spring Boot 輕量級(jí)分布式定時(shí)任務(wù)技術(shù)實(shí)現(xiàn)方案
作者:farerboy
本方案基于Spring Schedule實(shí)現(xiàn)分布式定時(shí)任務(wù),解決上述問(wèn)題,提供高可用、可擴(kuò)展的任務(wù)調(diào)度能力。
一、背景與挑戰(zhàn)
在分布式系統(tǒng)中,定時(shí)任務(wù)面臨三大核心挑戰(zhàn):
- 單點(diǎn)故障:傳統(tǒng)單節(jié)點(diǎn)任務(wù)調(diào)度存在單點(diǎn)故障風(fēng)險(xiǎn)
- 重復(fù)執(zhí)行:多節(jié)點(diǎn)部署可能導(dǎo)致任務(wù)被重復(fù)執(zhí)行
- 負(fù)載均衡:任務(wù)無(wú)法在集群節(jié)點(diǎn)間智能分配
本方案基于Spring Schedule實(shí)現(xiàn)分布式定時(shí)任務(wù),解決上述問(wèn)題,提供高可用、可擴(kuò)展的任務(wù)調(diào)度能力。
二、架構(gòu)設(shè)計(jì)

核心組件
- 分布式協(xié)調(diào)器:使用Redis/ZooKeeper實(shí)現(xiàn)節(jié)點(diǎn)協(xié)調(diào)
- 任務(wù)分配器:基于一致性哈希算法分配任務(wù)
- 心跳檢測(cè):節(jié)點(diǎn)健康監(jiān)控與故障轉(zhuǎn)移
- 任務(wù)鎖:防止任務(wù)重復(fù)執(zhí)行
三、核心實(shí)現(xiàn)
1. 分布式鎖實(shí)現(xiàn)
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class DistributedLock {
private final RedisTemplate<String, String> redisTemplate;
private final String lockKey;
private final String lockValue;
private final long expireTime;
public DistributedLock(RedisTemplate<String, String> redisTemplate,
String lockKey, long expireTime) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString();
this.expireTime = expireTime;
}
public boolean tryLock() {
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
return Boolean.TRUE.equals(acquired);
}
public void unlock() {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
lockValue
);
}
}2. 分布式任務(wù)調(diào)度器
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.concurrent.ScheduledFuture;
@Component
public class DistributedTaskScheduler {
private final DistributedLock lock;
private final RedisTemplate<String, String> redisTemplate;
private final ScheduledTaskRegistrar taskRegistrar;
private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
public DistributedTaskScheduler(RedisTemplate<String, String> redisTemplate,
ScheduledTaskRegistrar taskRegistrar) {
this.redisTemplate = redisTemplate;
this.taskRegistrar = taskRegistrar;
this.lock = new DistributedLock(redisTemplate, "task-lock", 5000);
}
public void scheduleTask(String taskName, Runnable task, String cron) {
ScheduledFuture<?> future = taskRegistrar.getScheduler().schedule(
() -> executeWithLock(taskName, task),
new CronTrigger(cron)
);
scheduledTasks.put(taskName, future);
}
private void executeWithLock(String taskName, Runnable task) {
DistributedLock taskLock = new DistributedLock(
redisTemplate, "task-" + taskName, 30000);
if (taskLock.tryLock()) {
try {
task.run();
} finally {
taskLock.unlock();
}
}
}
public void cancelTask(String taskName) {
ScheduledFuture<?> future = scheduledTasks.get(taskName);
if (future != null) {
future.cancel(true);
scheduledTasks.remove(taskName);
}
}
}3. 節(jié)點(diǎn)注冊(cè)與發(fā)現(xiàn)
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PostConstruct;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public class NodeRegistry {
private final RedisTemplate<String, String> redisTemplate;
private final String nodeId;
@Value("${server.port}")
private int port;
@Value("${spring.application.name}")
private String appName;
private static final String NODES_KEY = "distributed:nodes";
private static final long HEARTBEAT_INTERVAL = 5000;
public NodeRegistry(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
this.nodeId = InetAddress.getLocalHost().getHostName() + ":" + port;
}
@PostConstruct
public void init() {
registerNode();
}
@Scheduled(fixedRate = HEARTBEAT_INTERVAL)
public void heartbeat() {
redisTemplate.opsForZSet().add(NODES_KEY, nodeId, System.currentTimeMillis());
}
private void registerNode() {
redisTemplate.opsForZSet().add(NODES_KEY, nodeId, System.currentTimeMillis());
redisTemplate.expire(NODES_KEY, HEARTBEAT_INTERVAL * 3, TimeUnit.MILLISECONDS);
}
public Set<String> getActiveNodes() {
long now = System.currentTimeMillis();
long cutoff = now - HEARTBEAT_INTERVAL * 2;
// 清理過(guò)期節(jié)點(diǎn)
redisTemplate.opsForZSet().removeRangeByScore(NODES_KEY, 0, cutoff);
return redisTemplate.opsForZSet().range(NODES_KEY, 0, -1);
}
public boolean isLeader() {
Set<String> nodes = getActiveNodes();
return !nodes.isEmpty() && nodes.iterator().next().equals(nodeId);
}
}4. 任務(wù)分配策略(一致性哈希)
import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;
public class ConsistentHash<T> {
private final SortedMap<Integer, T> circle = new TreeMap<>();
private final int numberOfReplicas;
public ConsistentHash(Collection<T> nodes, int numberOfReplicas) {
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
addNode(node);
}
}
public void addNode(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
int hash = hash(node.toString() + i);
circle.put(hash, node);
}
}
public void removeNode(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
int hash = hash(node.toString() + i);
circle.remove(hash);
}
}
public T getNode(String key) {
if (circle.isEmpty()) {
return null;
}
int hash = hash(key);
if (!circle.containsKey(hash)) {
SortedMap<Integer, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
private int hash(String key) {
return Hashing.murmur3_32().hashString(key, StandardCharsets.UTF_8).asInt();
}
}四、深度優(yōu)化方案
1. 任務(wù)分片執(zhí)行
public class ShardedTask {
private final ConsistentHash<String> consistentHash;
private final NodeRegistry nodeRegistry;
@Scheduled(cron = "0 0/5 * * * ?")
public void executeShardedTask() {
String taskId = "sharded-data-process";
List<String> shards = getDataShards();
for (String shard : shards) {
String assignedNode = consistentHash.getNode(shard);
if (nodeRegistry.getNodeId().equals(assignedNode)) {
processShard(shard);
}
}
}
private void processShard(String shard) {
// 處理分片數(shù)據(jù)
}
}2. 故障轉(zhuǎn)移機(jī)制
@Scheduled(fixedRate = 10000)
public void checkTaskHealth() {
Map<String, ScheduledFuture<?>> tasks = new HashMap<>(scheduledTasks);
for (Map.Entry<String, ScheduledFuture<?>> entry : tasks.entrySet()) {
String taskName = entry.getKey();
ScheduledFuture<?> future = entry.getValue();
if (future.isCancelled() || future.isDone()) {
reassignTask(taskName);
}
}
}
private void reassignTask(String taskName) {
if (nodeRegistry.isLeader()) {
// 領(lǐng)導(dǎo)者節(jié)點(diǎn)重新分配任務(wù)
String newNode = selectNewNode(taskName);
sendTaskAssignment(newNode, taskName);
}
}3. 任務(wù)狀態(tài)持久化
@Entity
public class TaskExecution {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String taskName;
private String nodeId;
private LocalDateTime startTime;
private LocalDateTime endTime;
private TaskStatus status;
private String errorMessage;
}
public enum TaskStatus {
PENDING, RUNNING, COMPLETED, FAILED
}
@Scheduled(fixedDelay = 30000)
public void recoverFailedTasks() {
List<TaskExecution> failedTasks = taskRepository.findByStatus(TaskStatus.FAILED);
for (TaskExecution task : failedTasks) {
if (task.getEndTime().isBefore(LocalDateTime.now().minusMinutes(5))) {
scheduleTask(task.getTaskName(), getTaskRunnable(task), task.getCronExpression());
task.setStatus(TaskStatus.PENDING);
taskRepository.save(task);
}
}
}五、使用場(chǎng)景
1. 金融系統(tǒng)對(duì)賬任務(wù)
@Configuration
public class ReconciliationConfig {
@Bean
public Runnable reconciliationTask() {
return () -> {
// 1. 獲取未對(duì)賬訂單
// 2. 執(zhí)行對(duì)賬邏輯
// 3. 生成對(duì)賬報(bào)告
// 4. 發(fā)送通知
};
}
@PostConstruct
public void init(DistributedTaskScheduler scheduler) {
scheduler.scheduleTask("daily-reconciliation",
reconciliationTask(), "0 0 2 * * ?"); // 每天凌晨2點(diǎn)執(zhí)行
}
}2. 電商庫(kù)存同步
@Scheduled(cron = "0 */5 * * * ?")
public void syncInventory() {
DistributedLock lock = new DistributedLock(redisTemplate, "inventory-sync", 300000);
if (lock.tryLock()) {
try {
// 分頁(yè)同步庫(kù)存數(shù)據(jù)
int page = 0;
while (true) {
List<Inventory> items = inventoryService.getInventoryPage(page, 100);
if (items.isEmpty()) break;
for (Inventory item : items) {
syncToExternalSystem(item);
}
page++;
}
} finally {
lock.unlock();
}
}
}3. 日志分析任務(wù)
public class LogAnalysisJob {
private final ConsistentHash<String> logHash;
@Scheduled(cron = "0 0 4 * * ?") // 每天4點(diǎn)執(zhí)行
public void analyzeLogs() {
LocalDate date = LocalDate.now().minusDays(1);
List<String> logFiles = findLogFiles(date);
for (String file : logFiles) {
String node = logHash.getNode(file);
if (nodeRegistry.getNodeId().equals(node)) {
analyzeFile(file);
}
}
}
private void analyzeFile(String filePath) {
// 分布式分析日志文件
}
}六、性能優(yōu)化策略
1. 任務(wù)負(fù)載均衡
public class LoadBalancer {
private final Map<String, Integer> nodeLoad = new ConcurrentHashMap<>();
@Scheduled(fixedRate = 5000)
public void updateNodeLoad() {
double load = calculateSystemLoad();
redisTemplate.opsForZSet().add("node-load", nodeId, load);
}
public String selectNodeForTask(String taskType) {
Set<ZSetOperations.TypedTuple<String>> nodes = redisTemplate.opsForZSet()
.rangeWithScores("node-load", 0, 0); // 獲取負(fù)載最低的節(jié)點(diǎn)
if (nodes != null && !nodes.isEmpty()) {
return nodes.iterator().next().getValue();
}
return null;
}
}2. 任務(wù)執(zhí)行監(jiān)控
@Aspect
@Component
public class TaskMonitoringAspect {
@Autowired
private TaskExecutionRepository repository;
@Around("@annotation(scheduled)")
public Object monitorTask(ProceedingJoinPoint joinPoint, Scheduled scheduled) throws Throwable {
String taskName = getTaskName(joinPoint);
TaskExecution execution = new TaskExecution();
execution.setTaskName(taskName);
execution.setStartTime(LocalDateTime.now());
execution.setStatus(TaskStatus.RUNNING);
execution = repository.save(execution);
try {
Object result = joinPoint.proceed();
execution.setStatus(TaskStatus.COMPLETED);
return result;
} catch (Exception e) {
execution.setStatus(TaskStatus.FAILED);
execution.setErrorMessage(e.getMessage());
throw e;
} finally {
execution.setEndTime(LocalDateTime.now());
repository.save(execution);
}
}
}3. 動(dòng)態(tài)任務(wù)調(diào)整
@RestController
@RequestMapping("/tasks")
public class TaskController {
@Autowired
private DistributedTaskScheduler scheduler;
@PostMapping("/{taskName}/schedule")
public String scheduleTask(@PathVariable String taskName,
@RequestBody ScheduleRequest request) {
Runnable task = taskRegistry.getTask(taskName);
if (task != null) {
scheduler.scheduleTask(taskName, task, request.getCron());
return "Task scheduled successfully";
}
return "Task not found";
}
@PostMapping("/{taskName}/cancel")
public String cancelTask(@PathVariable String taskName) {
scheduler.cancelTask(taskName);
return "Task canceled";
}
}七、部署架構(gòu)

八、生產(chǎn)環(huán)境配置
# application-prod.yml
spring:
redis:
cluster:
nodes: redis1:6379,redis2:6379,redis3:6379
timeout: 3000
distributed:
tasks:
heartbeat-interval: 5000
lock-expire: 30000
max-task-retries: 3
shard-replicas: 100
management:
endpoints:
web:
exposure:
include: health,info,tasks
endpoint:
tasks:
enabled: true九、總結(jié)與展望
方案優(yōu)勢(shì)
- 無(wú)縫集成:基于Spring Schedule,無(wú)需引入額外調(diào)度框架
- 高可用性:自動(dòng)故障轉(zhuǎn)移,無(wú)單點(diǎn)故障
- 彈性擴(kuò)展:節(jié)點(diǎn)動(dòng)態(tài)加入/退出,自動(dòng)重新分配任務(wù)
- 精確控制:細(xì)粒度任務(wù)管理API
- 可視化監(jiān)控:完整任務(wù)執(zhí)行歷史記錄
未來(lái)演進(jìn)
- AI驅(qū)動(dòng)的任務(wù)調(diào)度:基于歷史數(shù)據(jù)預(yù)測(cè)任務(wù)執(zhí)行時(shí)間
- 跨集群任務(wù)協(xié)調(diào):支持多數(shù)據(jù)中心任務(wù)調(diào)度
- 資源感知調(diào)度:根據(jù)節(jié)點(diǎn)資源使用情況分配任務(wù)
- 任務(wù)優(yōu)先級(jí)系統(tǒng):支持高優(yōu)先級(jí)任務(wù)搶占
責(zé)任編輯:武曉燕
來(lái)源:
小林聊編程



































