Spring + asyncTool:實現復雜任務的優雅編排與高效執行
一、集成到 Spring Boot
1. 添加依賴
在項目的 pom.xml 文件中添加 asyncTool 的依賴:
<dependency>
<groupId>com.jd.platform</groupId>
<artifactId>asyncTool</artifactId>
<version>版本號</version>
</dependency>2. 配置線程池
雖然 asyncTool 內部會管理線程池,但為了更好地控制線程的使用,可以自定義線程池。以下是兩種配置方式:
1)自定義線程池
@Configuration
@EnableAsync // 開啟線程池
public class TaskExecutePool {
@Autowired
private TaskThreadPoolConfig config;
@Bean
public Executor myTaskAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(config.getCorePoolSize()); // 核心線程池大小
executor.setMaxPoolSize(config.getMaxPoolSize()); // 最大線程數
executor.setQueueCapacity(config.getQueueCapacity()); // 隊列容量
executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); // 活躍時間
executor.setThreadNamePrefix("MyExecutor-"); // 線程名字前綴
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略
executor.initialize();
return executor;
}
}2)修改原生 Spring 異步線程池的裝配
@Configuration
@EnableAsync // 開啟線程池
public class NativeAsyncTaskExecutePool implements AsyncConfigurer {
@Autowired
private TaskThreadPoolConfig config;
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(config.getCorePoolSize());
executor.setMaxPoolSize(config.getMaxPoolSize());
executor.setQueueCapacity(config.getQueueCapacity());
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
executor.setThreadNamePrefix("MyExecutor2-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, objects) -> {
log.error("==========================" + ex.getMessage() + "=======================", ex);
log.error("exception method:" + method.getName());
};
}
}二、核心方法說明
1. IWorker 接口
? action(T object, Map<String, WorkerWrapper> allWrappers):任務的具體執行邏輯。object 是任務的輸入參數,allWrappers 是所有任務的包裝類集合,可用于獲取其他任務的結果。
? defaultValue():任務超時或異常時的默認返回值。
2. ICallback 接口
? begin():任務開始時的回調。
? result(boolean success, T param, WorkResult<V> workResult):任務執行結果的回調。success 表示任務是否成功,param 是任務的輸入參數,workResult 是任務的執行結果。
3. WorkerWrapper 類
? id:任務的唯一標識。
? param:任務的輸入參數。
? worker:任務的具體實現。
? callback:任務的回調實現。
? depend:任務的依賴關系,定義任務的執行順序。
? next:任務的后續任務,用于定義任務的執行順序。
三、詳細使用方式及示例
1. 串行任務
任務按順序依次執行。以下是一個串行任務的示例:
// 定義任務 A
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(new WorkerA())
.callback(new WorkerA())
.param(1)
.build();
// 定義任務 B,依賴于任務 A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(new WorkerB())
.callback(new WorkerB())
.param(2)
.depend(wrapperA)
.build();
// 定義任務 C,依賴于任務 B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(new WorkerC())
.callback(new WorkerC())
.param(3)
.depend(wrapperB)
.build();
// 提交任務
Async.beginWork(1000, wrapperA);2. 并行任務
多個任務同時執行。以下是一個并行任務的示例:
// 定義任務 A
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(new WorkerA())
.callback(new WorkerA())
.param(1)
.build();
// 定義任務 B
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(new WorkerB())
.callback(new WorkerB())
.param(2)
.build();
// 定義任務 C
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(new WorkerC())
.callback(new WorkerC())
.param(3)
.build();
// 提交任務
Async.beginWork(1000, wrapperA, wrapperB, wrapperC);3. 阻塞等待 - 先串行,后并行
先執行任務 A,然后任務 B 和任務 C 并行執行:
// 定義任務 A
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(new WorkerA())
.callback(new WorkerA())
.param(1)
.build();
// 定義任務 B,依賴于任務 A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(new WorkerB())
.callback(new WorkerB())
.param(2)
.depend(wrapperA)
.build();
// 定義任務 C,依賴于任務 A
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(new WorkerC())
.callback(new WorkerC())
.param(3)
.depend(wrapperA)
.build();
// 提交任務
Async.beginWork(1000, wrapperA);4. 阻塞等待 - 先并行,后串行
任務 B 和任務 C 并行執行,完成后任務 A 執行:
// 定義任務 A
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerA")
.worker(new WorkerA())
.callback(new WorkerA())
.param(null) // 參數為任務 B 和任務 C 的結果
.build();
// 定義任務 B
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerB")
.worker(new WorkerB())
.callback(new WorkerB())
.param(2)
.next(wrapperA)
.build();
// 定義任務 C
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
.id("workerC")
.worker(new WorkerC())
.callback(new WorkerC())
.param(3)
.next(wrapperA)
.build();
// 提交任務
Async.beginWork(1000, wrapperB, wrapperC);四、主要作用說明
1. 任務編排
靈活的并行與串行組合: asyncTool 支持任意組合多線程的并行和串行任務,開發者可以根據業務需求靈活定義任務的執行順序。
任務依賴管理: 它允許任務之間存在強依賴和弱依賴關系。例如,某些任務必須在其他任務完成后才能執行,而另一些任務則可以在依賴任務中的任意一個或多個完成后執行。
2. 執行監控與回調
全鏈路回調機制: 每個任務在執行過程中,無論成功、失敗、超時還是異常,都會觸發回調函數。這使得開發者可以實時監控任務的執行狀態。
任務跳過回調: 即使某些任務被跳過未執行,asyncTool 也會提供回調,方便開發者進行日志記錄或異常處理。
3. 異常處理與容錯
異常與超時處理: 每個任務可以設置超時時間和默認值,當任務執行失敗或超時時,會返回默認值,確保整個任務鏈的穩定性。
獨立任務容錯: 單個任務的失敗不會影響其他任務的回調和最終結果的獲取,但如果任務依賴的上游任務失敗,則當前任務也會失敗并返回默認值。
4. 性能優化
低線程設計: asyncTool 采用低線程設計,減少線程的創建和銷毀開銷。例如,在多個任務依賴關系中,后續任務可以復用前一個任務的線程。
無鎖機制: 整個框架全程無鎖,避免了鎖帶來的性能開銷,提高了并發性能。
5. 結果管理
按順序返回結果: 任務執行完成后,asyncTool 可以按任務添加的順序返回結果列表,方便開發者進行后續處理。
支持異步回調: 除了同步阻塞返回結果外,還支持整個任務組的異步回調,避免阻塞主線程。
6. 線程池管理
線程池共享與獨占: 支持為每個任務組獨享線程池,也可以讓所有任務組共享一個線程池,靈活配置資源。
7. 簡化開發
封裝復雜邏輯: asyncTool 封裝了復雜的并發邏輯,使得開發者可以更專注于業務邏輯的實現,而無需深入了解底層的并發機制。
五、注意事項
? 任務的線程安全: 由于任務可能在多個線程中并發執行,需要確保任務的線程安全性。
? 任務的異常處理: 在任務執行過程中可能會出現異常,需要合理地處理異常,避免影響整個應用的運行。
? 任務的超時設置: 合理設置任務的超時時間,避免任務長時間未完成導致資源浪費。
? 任務的依賴關系: 正確配置任務的依賴關系,確保任務按預期順序執行。
通過以上詳細說明和代碼示例,你可以在 Spring Boot 項目中靈活使用 asyncTool 實現復雜的多線程任務編排。



































