轉轉基于MQ的分布式重試框架設計方案
1 背景
在分布式場景下,為了保障系統的可用性和數據的最終一致性,采用基于消息隊列(MQ)的重試機制是一種常見的解決方案。偽代碼如下:
/**
* 需要保證最終一致性的函數
*/
public void doSomething(Object args) {
try {
// 執行事務的操作
executeTransaction();
// 提交事務
commitTransaction();
} catch (Exception e) {
// 回滾事務
rollbackTransaction();
// 記錄日志
log.error(e);
// 序列化參數
byte[] body = serialize(args);
// 構建消息, 指定Topic、Body
Message msg = new Message("doSomethingTopic", body);
// 發送失敗重試消息
mq.send(msg);
}
}
/**
* 消費者,用于失敗重試處理
*/
@Consumer(topic = "doSomethingTopic")
public void consume(Message msg) {
// 反序列化
Object args = msg.deserialize();
// 重試
doSomething(args);
}在上述示例中,我們需要編寫一系列與業務無關的代碼來實現業務邏輯的重試機制。為了減輕開發人員的負擔并讓其專注于核心業務,我們可以對這些無關代碼進行抽象和優化,以提高開發效率和代碼質量。
2 方案
通過如下步驟,我們對重試邏輯進行了封裝,開發人員只需要在需要保證最終一致性的函數上標注一個重試注解,便擁有基于MQ的分布式重試能力。
1. 使用注解與AOP: 通過使用注解與面向切面編程(AOP)的技術,將重試邏輯模塊與業務代碼解耦。開發人員可以在需要保證最終一致性的業務方法上添加注解,通過AOP將重試邏輯應用到目標方法中,從而自動觸發重試機制。
2. 提供配置化選項:為重試邏輯提供可配置化的選項,例如設置最大重試次數、重試間隔時間等。這樣,開發人員可以根據具體業務需求進行調整,而無需修改代碼。
3. 異常處理和日志記錄:在重試邏輯中合理地處理異常,并在必要時記錄相關日志。這樣可以幫助開發人員及時發現問題并進行排查。
4. 提供可視化監控工具:開發一個可視化的監控工具,用于實時跟蹤重試操作和相關指標。這樣可以幫助開發人員更好地理解重試的執行情況,并進行故障排查和性能優化。
圖片
3 效果
我們引入了@MQRetry注解用于標記業務邏輯函數,一旦該函數發生異常,該注解會將服務名、類的完整名稱、方法名稱以及實際參數列表發送到消息隊列(MQ)中。同時系統會注冊一個MQ消費者來消費這些消息,并進行重試處理。
舉個例子,假設我們有一個名為doSomething的函數,它包含了需要保證最終一致性執行的業務邏輯。僅需在該函數上添加@MQRetry注解,當函數出現異常時,框架會自動發送一條MQ重試消息。這條消息可以被當前服務的任意一臺服務器消費,并重新執行doSomething函數。
@Service
class Service {
@MQRetry
public void doSomething(String params1, String params2, List<String> params3) {
//throw new RuntimeException(); 拋異常將重試
//RetryContext.markRetryLater(); 標記為需要下次重試
//int retryCount = RetryContext.getRetryCount(); 獲取重試次數
}
}
@Controller
class Controller {
@Autowired
private Service service;
service.doSomething("1", "2", Arrays.asList("3", "4"));
}4 可選項
除此之外,我們還為開發人員提供了一些可選項,提供一些可配置的能力。
/**
* 基于MQ的分布式重試組件
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MQRetry {
/**
* 最大重試次數,默認與上限為16次
*/
int maxAttempts() default 16;
/**
* 忽略的異常類列表,默認所有異常都重試
*/
Class<? extends Throwable>[] exclude() default {};
/**
* 需要重試的異常類列表,默認所有異常都重試
*/
Class<? extends Throwable>[] include() default {};
/**
* 出現異常時的處理函數, 格式: Bean名.方法名. 如: smsService.onError
* 也可以只設置函數名, 不設置Bean名將執行本類的函數. 如: onError
* 要求函數參數必須與重試函數的參數完全一致
*/
String errorHandler() default "";
/**
* true: 第一次調用時, 同步執行@MQRetry函數, 如果失敗再使用MQ
* false: 調用@MQRetry函數時, 只會發送MQ
*/
boolean firstSyncCall() default true;
/**
* 消費線程數,默認為20個
*/
int consumeThread() default 20;
}5 注意事項
- 適用于異步場景,重試函數不要設置返回值,函數的返回值將不會有任何的實際意義。
- At lease Once保證,重試函數需要保證冪等。
- 使用了AOP代理實現,因此,@Transactional的注意事項同樣適用于@MQRetry,如this調用、private函數、final函數會導致重試失效。
- 如果重試函數需要增加參數,請在函數參數最后位置添加。歷史消息消費時對應參數將填充為null。
6 總結
在計算機領域中,重試機制的重要性不言而喻。它通常分為兩種模式:客戶端模式和服務端模式。客戶端模式簡單易用,但可靠性較低;而服務端模式雖然相對復雜,但能夠提供更高的可靠性。
無論是客戶端模式還是服務端模式,重試機制都是保障系統正常運行的重要一環。選擇適合您業務需求的模式,并通過合理的配置項進行優化,將為您的系統帶來更好的表現和用戶體驗。
圖片
關于作者
苑沖,轉轉架構部存儲服務負責人,負責MQ、監控系統、KV存儲、時序數據庫、Redis、KMS秘鑰管理等基礎組件。喜歡深入思考問題,對探索新領域和解決問題充滿熱情。































