DDD與微服務(wù)集成的第一戰(zhàn)役:客戶端重試&服務(wù)端冪等
當(dāng)一個(gè)接口從簡(jiǎn)單的內(nèi)部調(diào)用升級(jí)為遠(yuǎn)程方法調(diào)用(RPC)會(huì)面臨很多問題,比如:
- 本地事務(wù)失效。在內(nèi)部調(diào)用時(shí),多個(gè)方法通常在同一事務(wù)中執(zhí)行,可以使用本地?cái)?shù)據(jù)庫事務(wù)來確保數(shù)據(jù)的一致性。但是,在遠(yuǎn)程方法調(diào)用中,由于涉及到網(wǎng)絡(luò)通信,事務(wù)的邊界會(huì)擴(kuò)展到多個(gè)系統(tǒng)之間,因此無法直接使用本地事務(wù)。如果遠(yuǎn)程方法調(diào)用出現(xiàn)異常,可能會(huì)導(dǎo)致事務(wù)提交失敗,從而產(chǎn)生數(shù)據(jù)不一致;
- 第三狀態(tài)影響。網(wǎng)絡(luò)不確定性可能導(dǎo)致遠(yuǎn)程調(diào)用無法成功獲得結(jié)果,例如網(wǎng)絡(luò)連接中斷、網(wǎng)絡(luò)超時(shí)等。在這種情況下,客戶端無法獲得期望的結(jié)果,調(diào)用會(huì)以網(wǎng)絡(luò)錯(cuò)誤或超時(shí)的方式結(jié)束;
- 服務(wù)版本兼容性問題。如果服務(wù)接口發(fā)生變化,客戶端和服務(wù)端的版本不匹配可能導(dǎo)致調(diào)用失敗;
- 性能問題、可用性問題、安全問題等;
由于涉及的問題比較多,這里重點(diǎn)分析和解決 RPC 調(diào)用時(shí)的第三狀態(tài)問題。
1. 什么是第三狀態(tài)
當(dāng)一個(gè)客戶端發(fā)起一個(gè)RPC請(qǐng)求時(shí),服務(wù)端可能會(huì)返回不同的狀態(tài),包括:
- 成功:服務(wù)端成功完成了客戶端發(fā)送的請(qǐng)求,并返回對(duì)應(yīng)的響應(yīng)結(jié)果;
- 失敗:服務(wù)端無法成功處理客戶端發(fā)送的請(qǐng)求,并返回錯(cuò)誤信息或異常;
- 超時(shí):調(diào)用方無法在規(guī)定時(shí)間收到服務(wù)端的處理結(jié)果,所以無法知道請(qǐng)求的最終處理結(jié)果;
如下圖所示:
圖片
一般情況下,調(diào)用方在規(guī)定時(shí)間收到被調(diào)用方的返回結(jié)果,能夠非常明確的知道處理結(jié)果是成功還是失敗。
當(dāng)網(wǎng)絡(luò)或被調(diào)用方出問題,就會(huì)觸發(fā)超時(shí),比如下圖所示:
圖片
如果被調(diào)用方異常或者網(wǎng)絡(luò)發(fā)生阻塞,調(diào)用方發(fā)送的 Request 請(qǐng)求沒有被正常處理,那調(diào)用方只能在等待若干時(shí)間后拋出異常進(jìn)行流程中斷。
又或者如下圖所示:
圖片
被調(diào)用方處理時(shí)間過長(zhǎng)或者網(wǎng)絡(luò)發(fā)生阻塞,調(diào)用方無法在規(guī)定時(shí)間獲得最終結(jié)果,也只能觸發(fā)超時(shí)中斷。
可見,如果發(fā)生 超時(shí) 情況,對(duì)于處理結(jié)果就處于未知狀態(tài),這就是所謂的“第三狀態(tài)”:
- 被調(diào)用方成功接收到請(qǐng)求并完成了處理;
- 被調(diào)用方完全沒有接收到請(qǐng)求;
在出現(xiàn)第三狀態(tài)時(shí),在不做任何處理前,根本就無法獲取最終的處理結(jié)果。在該場(chǎng)景下,最通用的解決方案便是 客戶端重試 + 服務(wù)端冪等。
- 客戶端重試。指的是在RPC調(diào)用出現(xiàn)超時(shí)的情況下,客戶端自動(dòng)重新發(fā)送相同的請(qǐng)求。然而,在客戶端重試時(shí)需要注意避免重復(fù)執(zhí)行有副作用的操作,比如避免重復(fù)插入數(shù)據(jù);
- 服務(wù)端冪等。指的是對(duì)于相同的輸入請(qǐng)求,服務(wù)端能夠產(chǎn)生相同的結(jié)果,而且不會(huì)對(duì)系統(tǒng)狀態(tài)造成影響。冪等性是為了應(yīng)對(duì)由于重試等原因?qū)е轮貜?fù)執(zhí)行的副作用;
通過客戶端重試和服務(wù)端冪等的方式,可以增加RPC調(diào)用的可靠性和數(shù)據(jù)一致性。客戶端重試可以處理網(wǎng)絡(luò)不穩(wěn)定、服務(wù)端故障等導(dǎo)致的失敗情況,而服務(wù)端冪等性能保證相同的請(qǐng)求不會(huì)重復(fù)執(zhí)行或引起數(shù)據(jù)不一致的問題。這兩個(gè)技術(shù)結(jié)合使用,可以提高分布式系統(tǒng)中RPC調(diào)用的健壯性和可靠性。
2. 客戶端重試
客戶端重試指的是在RPC調(diào)用失敗的情況下,客戶端自動(dòng)重新發(fā)送相同的請(qǐng)求。客戶端可以設(shè)置重試的次數(shù)和時(shí)間間隔,直到得到預(yù)期的成功響應(yīng)或達(dá)到最大重試次數(shù)。客戶端重試機(jī)制可以彌補(bǔ)網(wǎng)絡(luò)不穩(wěn)定性或服務(wù)端異常導(dǎo)致的調(diào)用失敗。然而,在客戶端重試時(shí)需要注意避免重復(fù)執(zhí)行有副作用的操作,比如避免重復(fù)插入數(shù)據(jù)。此外,還需要合理設(shè)置重試策略,避免因頻繁的重試導(dǎo)致網(wǎng)絡(luò)負(fù)荷增加或服務(wù)端壓力過大。
如下圖所示:
圖片
- 第一次獲取商品信息時(shí),由于網(wǎng)絡(luò)異常導(dǎo)致請(qǐng)求超時(shí);
- 網(wǎng)絡(luò)異常觸發(fā) retry 機(jī)制,重新發(fā)起新的請(qǐng)求;
- 新請(qǐng)求成功發(fā)送至商品服務(wù)并獲取信息,從而使得流程從異常中恢復(fù),最終正常執(zhí)行完成;
由此可見,重試的工作原理非常簡(jiǎn)單。
Spring Retry是Spring Framework提供的一個(gè)模塊,用于簡(jiǎn)化和增強(qiáng)應(yīng)用程序中的重試操作。它提供了一種聲明式的方式來處理方法調(diào)用的重試,以應(yīng)對(duì)在分布式系統(tǒng)或有限資源環(huán)境下可能出現(xiàn)的失敗情況。
Spring Retry模塊的主要特性包括:
- 聲明式注解:通過使用注解(例如@Retryable、@Recover等),可以將重試行為與方法關(guān)聯(lián)起來。通過在方法上添加@Retryable注解,可以指定需要進(jìn)行重試的異常類型,最大重試次數(shù),重試間隔等信息;
- 容錯(cuò)策略:Spring Retry提供了多種容錯(cuò)策略,包括簡(jiǎn)單重試、指數(shù)退避重試、固定時(shí)間間隔重試等,開發(fā)者可以根據(jù)具體需求選擇合適的策略;
- 回退機(jī)制:如果重試次數(shù)超過限定值仍然失敗,Spring Retry可以通過@Recover注解來指定一個(gè)回退方法,用于執(zhí)行備選邏輯或返回默認(rèn)值;
要使用Spring Retry,需要在項(xiàng)目中引入相應(yīng)的依賴:
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>2.1. Retryable
重試行為主要是由 @Retryable 注解完成,通過為目標(biāo)方法添加注解,將其納入重試管理,Spring Retry將負(fù)責(zé)在出現(xiàn)失敗情況時(shí)自動(dòng)進(jìn)行重試。
以下是 @Retryable 注解代碼示例:
@Service
public class MyService {
@Retryable(value = {SomeException.class}, maxAttempts = 3)
public void doSomething() {
// 需要進(jìn)行重試的業(yè)務(wù)邏輯
}
}上述示例中的 doSomething() 方法標(biāo)記為需要進(jìn)行重試,在捕獲到 SomeException 異常時(shí)觸發(fā)重試,最多重試3次。
下表列出了 @Retryable 注解的所有屬性及其說明:
屬性名 | 說明 |
value | 定義重試的異常類型。默認(rèn)情況下,將捕獲所有 |
maxAttempts | 定義最大重試次數(shù),默認(rèn)為3次。當(dāng)達(dá)到最大重試次數(shù)后,如果仍然失敗,則不再進(jìn)行重試。 |
backoff | 定義重試間隔策略。可以使用 |
delayExpression | 定義重試間隔的SpEL表達(dá)式。可以使用 |
multiplier | 定義指數(shù)退避策略的倍數(shù),默認(rèn)為1。在使用指數(shù)退避策略時(shí),每次重試的間隔時(shí)間將是 |
random | 定義是否啟用隨機(jī)化延遲。默認(rèn)為 |
exceptionExpression | 定義一個(gè)SpEL表達(dá)式,用于決定是否進(jìn)行重試。該表達(dá)式可以使用 |
include | 定義需要包含在重試行為中的異常類型,默認(rèn)為空數(shù)組,表示包括所有異常類型。可以指定一個(gè)或多個(gè)異常類來明確指定需要處理的異常類型。例如, |
exclude | 定義需要排除在重試行為之外的異常類型,默認(rèn)為空數(shù)組,表示排除所有異常類型。可以指定一個(gè)或多個(gè)異常類來明確排除某些異常類型。例如, |
這些屬性可以根據(jù)具體的需求進(jìn)行配置,以實(shí)現(xiàn)靈活的重試策略。
2.2. Recover
Spring Retry 的 fallback 機(jī)制是在重試失敗后,執(zhí)行備選邏輯的一種處理方式。通過使用 @Recover 注解,在重試失敗后,可以調(diào)用備選邏輯方法來完成錯(cuò)誤處理、數(shù)據(jù)清理等操作。
具體來說,當(dāng) @Retryable 注解標(biāo)記的方法達(dá)到最大重試次數(shù)或者拋出了無法重試的異常時(shí),Spring Retry將會(huì)嘗試查找與該方法參數(shù)類型相同的方法,并在找到時(shí)調(diào)用它。
@Retryable(maxAttempts = 3)
public void someMethod(String arg1, int arg2) {
// 重試業(yè)務(wù)邏輯
}
@Recover
public void recover(String arg1, int arg2) {
// 備選邏輯
}當(dāng) someMethod() 方法達(dá)到最大重試次數(shù)或拋出無法重試的異常時(shí),Spring Retry 將會(huì)查找并調(diào)用 recover() 方法,并傳遞相同的方法參數(shù)。在 recover() 方法中,應(yīng)該針對(duì)重試失敗的情況編寫備選邏輯,例如記錄日志、發(fā)送通知等操作。
需要注意的是,@Recover 注解必須放置在其對(duì)應(yīng)的重試方法所屬的類中,并且方法參數(shù)類型必須與重試方法一致。如果有多個(gè)重試方法,每個(gè)重試方法都可以對(duì)應(yīng)一個(gè)@Recover方法,以滿足不同的備選邏輯需求。
在使用fallback機(jī)制時(shí),應(yīng)該仔細(xì)考慮備選邏輯的實(shí)現(xiàn)方式,確保其能夠正確處理重試失敗的情況,并對(duì)數(shù)據(jù)的一致性和完整性產(chǎn)生最小的影響。
2.3. 不同場(chǎng)景下的 Retry 和 Fallback
@Retryable 和 @Recover 都是添加在類方法上的注解,不管是什么場(chǎng)景下的請(qǐng)求只會(huì)走固定流程。這樣的設(shè)計(jì)在復(fù)雜場(chǎng)景下是否夠用?
之前,我們看到通過 Retry 恢復(fù)網(wǎng)絡(luò)抖動(dòng)的場(chǎng)景;接下來讓我們看另一個(gè)場(chǎng)景,如下圖所示:
圖片
image
商品服務(wù)流量激增,導(dǎo)致 DB CPU 飆升,出現(xiàn)大量的慢 SQL,這時(shí)觸發(fā)了系統(tǒng)的 Retry 會(huì)是怎樣的結(jié)果?
- 在獲取商品失敗后,系統(tǒng)自動(dòng)觸發(fā) Retry 機(jī)制;
- 由于是商品服務(wù)本身出了問題,第二次請(qǐng)求仍舊失敗;
- 服務(wù)又觸發(fā)了第三次請(qǐng)求,仍未獲取結(jié)果;
- 達(dá)到最大重試次數(shù),仍舊無法獲取商品,只能通過異常中斷用戶請(qǐng)求;
通過 Retry 機(jī)制未能將流程從異常中恢復(fù)過來,反而給下游的 商品服務(wù) 造成了巨大傷害。
- 商品服務(wù)壓力大,響應(yīng)時(shí)間長(zhǎng);
- 上游系統(tǒng)由于超時(shí)觸發(fā)自動(dòng)重試;
- 自動(dòng)重試增大了對(duì)商品服務(wù)的調(diào)用;
- 商品服務(wù)請(qǐng)求量更大,更難以從故障中恢復(fù);
這就是常說的“讀放大”,假設(shè)用戶驗(yàn)證是否能夠購買請(qǐng)求的請(qǐng)求量為 n,那極端情況下 商品服務(wù)的請(qǐng)求量為 3n (其中 2n 是由 Retry 機(jī)制造成)
此時(shí),最優(yōu)解不是進(jìn)行 Retry 而是直接走 Fallback,給下游服務(wù)一定的恢復(fù)機(jī)會(huì)。
同樣是對(duì)商品服務(wù)接口(同一個(gè)接口)的調(diào)用,在不同的場(chǎng)景需要使用不同的策略用以適配不同的業(yè)務(wù)流程,通常情況下:
- Command 場(chǎng)景優(yōu)先使用 Retry 策略
這種流量即為重要,最好能保障流程的完整性
通常寫流量比較小,小范圍 Retry 不會(huì)對(duì)下游系統(tǒng)造成巨大影響
- Query 場(chǎng)景優(yōu)選使用 Fallback 策略
大多數(shù)展示場(chǎng)景,哪怕部分信息沒有獲取到對(duì)整體的影響也比較小
通常讀場(chǎng)景流量較高,Retry 對(duì)下游系統(tǒng)的傷害不容忽視
面對(duì)不同的業(yè)務(wù)場(chǎng)景,你會(huì)怎么做呢?準(zhǔn)備兩組不同的方法根據(jù)業(yè)務(wù)場(chǎng)景分別調(diào)用?
2.4. SmartFailure
SmartFailure 不是為不同的場(chǎng)景使用不同的方法,而是根據(jù)請(qǐng)求上下文信息,動(dòng)態(tài)的走 Retry 或 Fallback,從而更好的適應(yīng)復(fù)雜的業(yè)務(wù)場(chǎng)景。
整體設(shè)計(jì)如下:
圖片
整體流程如下:
- 讀取配置信息,將請(qǐng)求類型(ActionType)綁定到線程上下文;
- 然后執(zhí)行正常業(yè)務(wù)邏輯
- 當(dāng)調(diào)用 @SmartFault 注解的方法時(shí),會(huì)被 SmartFaultMethodInterceptor 攔截器攔截
攔截器通過 ActionTypeProvider 獲取當(dāng)前的 ActionType;
根據(jù) ActionType 對(duì)請(qǐng)求進(jìn)行路由;
如果是 COMMAND 操作,將使用 RetryTemplate 執(zhí)行請(qǐng)求,在發(fā)生異常時(shí),通過重試配置進(jìn)行請(qǐng)求重發(fā),從而最大限度的獲得遠(yuǎn)程結(jié)果;
如果是 QUERY 操作,將使用 FallbackTemplate(重試次數(shù)為0的 RetryTemplate)執(zhí)行請(qǐng)求,當(dāng)發(fā)生異常時(shí),調(diào)用 fallback 方法,執(zhí)行配置的 recover 方法,直接使用返回結(jié)果;
- 獲取遠(yuǎn)程結(jié)果后,執(zhí)行后續(xù)的業(yè)務(wù)邏輯;
- 最后,ActionAspect 將 ActionType 從線程上下文中移除;
使用前需添加 lego 依賴,具體如下:
<dependency>
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>最新版本</version>
</dependency>2.4.1. ActionTypeProvider
首先,需要準(zhǔn)備一個(gè) ActionTypeProvider 用以提供上下文信息。ActionTypeProvider 接口定義如下:
public interface ActionTypeProvider {
ActionType get();
}
public enum ActionType {
COMMAND, QUERY
}通常情況下,我們使用 ThreadLocal 組件將 ActionType 存儲(chǔ)于線程上下文,在使用時(shí)從上下中獲取相關(guān)信息。
public class ActionContext {
private static final ThreadLocal<ActionType> ACTION_TYPE_THREAD_LOCAL = new ThreadLocal<>();
public static void set(ActionType actionType){
ACTION_TYPE_THREAD_LOCAL.set(actionType);
}
public static ActionType get(){
return ACTION_TYPE_THREAD_LOCAL.get();
}
public static void clear(){
ACTION_TYPE_THREAD_LOCAL.remove();
}
}有了上下文之后,ActionBasedActionTypeProvider 直接從 Context 中獲取 ActionType 具體如下:
@Component
public class ActionBasedActionTypeProvider implements ActionTypeProvider {
@Override
public ActionType get() {
return ActionContext.get();
}
}如何對(duì)請(qǐng)求進(jìn)行標(biāo)記?如何對(duì) ActionType 進(jìn)行管理(包括信息綁定和信息清理)?最常用的方式便是:
- 提供一個(gè)注解,在方法上添加注解用于對(duì) ActionType 的配置;
- 提供一個(gè)攔截器,對(duì)方法調(diào)用進(jìn)行攔截:
方法調(diào)用前,從注解中獲取配置信息并綁定到上下文;
執(zhí)行具體的業(yè)務(wù)方法;
方法調(diào)用后,主動(dòng)清理上下文信息;
核心實(shí)現(xiàn)為:
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface Action {
ActionType type();
}
@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class ActionAspect {
@Pointcut("@annotation(com.geekhalo.lego.faultrecovery.smart.Action)")
public void pointcut() {
}
@Around(value = "pointcut()")
public Object action(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Action annotation = methodSignature.getMethod().getAnnotation(Action.class);
ActionContext.set(annotation.type());
try {
return joinPoint.proceed();
}finally {
ActionContext.clear();
}
}
}在這些組件的幫助下,我們只需在方法上基于 @Action 注解進(jìn)行標(biāo)記,便能夠?qū)?ActionType 綁定到上下文。
2.4.2. @SmartFault
在完成 ActionType 綁定到上下文之后,接下來要做的便是對(duì) 遠(yuǎn)程接口 進(jìn)行配置。遠(yuǎn)程接口的配置工作主要由 @SmartFault 來完成。其核心配置項(xiàng)包括:
配置項(xiàng) | 含義 | 默認(rèn)配置 |
recover | fallback 方法名稱 | |
maxRetry | 最大重試次數(shù) | 3 |
include | 觸發(fā)重試的異常類型 | |
exclude | 不需要重新的異常類型 |
測(cè)試 Demo 如下:
@Service
@Slf4j
@Getter
public class RetryService3 {
private int count = 0;
private int retryCount = 0;
private int fallbackCount = 0;
private int recoverCount = 0;
public void clean(){
this.retryCount = 0;
this.fallbackCount = 0;
this.recoverCount = 0;
}
/**
* Command 請(qǐng)求,啟動(dòng)重試機(jī)制
*/
@Action(type = ActionType.COMMAND)
@SmartFault(recover = "recover")
public Long retry(Long input) throws Throwable{
this.retryCount ++;
return doSomething(input);
}
/**
* Query 請(qǐng)求,啟動(dòng)Fallback機(jī)制
*/
@Action(type = ActionType.QUERY)
@SmartFault(recover = "recover")
public Long fallback(Long input) throws Throwable{
this.fallbackCount ++;
return doSomething(input);
}
@Recover
public Long recover(Throwable e, Long input){
this.recoverCount ++;
log.info("recover-{}", input);
return input;
}
private Long doSomething(Long input) {
// 偶數(shù)拋出異常
if (count ++ % 2 == 0){
log.info("Error-{}", input);
throw new RuntimeException();
}
log.info("Success-{}", input);
return input;
}
}2.4.3. 測(cè)試
最后,對(duì)代碼進(jìn)行簡(jiǎn)單測(cè)試:
@SpringBootTest(classes = DemoApplication.class)
public class RetryService3Test {
@Autowired
private RetryService3 retryService;
@BeforeEach
public void setup(){
retryService.clean();
}
@Test
public void retry() throws Throwable{
for (int i = 0; i < 100; i++){
retryService.retry(i + 0L);
}
Assertions.assertTrue(retryService.getRetryCount() > 0);
Assertions.assertTrue(retryService.getRecoverCount() == 0);
Assertions.assertTrue(retryService.getFallbackCount() == 0);
}
@Test
public void fallback() throws Throwable{
for (int i = 0; i < 100; i++){
retryService.fallback(i + 0L);
}
Assertions.assertTrue(retryService.getRetryCount() == 0);
Assertions.assertTrue(retryService.getRecoverCount() > 0);
Assertions.assertTrue(retryService.getFallbackCount() > 0);
}
}運(yùn)行 retry 測(cè)試,日志如下:
[main] c.g.l.c.f.smart.SmartFaultExecutor : action type is COMMAND
[main] c.g.l.faultrecovery.smart.RetryService3 : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor : Retry method public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.retry(java.lang.Long) throws java.lang.Throwable use [0]
[main] c.g.l.faultrecovery.smart.RetryService3 : Success-0可見,當(dāng) action type 為 COMMAND 時(shí):
- 第一次調(diào)用時(shí),觸發(fā)異常,打印:Error-0
- 此時(shí) SmartFaultExecutor 主動(dòng)進(jìn)行重試,打印:Retry method xxxx
- 方法重試成功,RetryService3 打印:Success-0
方法主動(dòng)進(jìn)行重試,流程從異常中恢復(fù),處理過程和效果符合預(yù)期。
運(yùn)行 fallback 測(cè)試,日志如下:
[main] c.g.l.c.f.smart.SmartFaultExecutor : action type is QUERY
[main] c.g.l.faultrecovery.smart.RetryService3 : Error-0
[main] c.g.l.c.f.smart.SmartFaultExecutor : recover From ERROR for method ReflectiveMethodInvocation: public java.lang.Long com.geekhalo.lego.faultrecovery.smart.RetryService3.fallback(java.lang.Long) throws java.lang.Throwable; target is of class [com.geekhalo.lego.faultrecovery.smart.RetryService3]
[main] c.g.l.faultrecovery.smart.RetryService3 : recover-0可見,當(dāng) action type 為 QUERY 時(shí):
- 第一次調(diào)用時(shí),觸發(fā)異常,打印:Error-0
- SmartFaultExecutor 執(zhí)行 Fallback 策略,打印:recover From ERROR for method xxxx
- 調(diào)用RetryService3的 recover 方法,獲取最終返回值。RetryService3 打印:recover-0
異常后自動(dòng)執(zhí)行 fallback,將流程從異常中恢復(fù)過來,處理過程和效果符合預(yù)期。
3. 服務(wù)端冪等
服務(wù)端冪等指的是對(duì)于相同的輸入請(qǐng)求,服務(wù)端能夠產(chǎn)生相同的結(jié)果,而且不會(huì)對(duì)系統(tǒng)狀態(tài)造成影響。冪等性是為了防止由于重試等原因?qū)е碌闹貜?fù)執(zhí)行帶來的副作用。在RPC中,服務(wù)端需要設(shè)計(jì)和實(shí)現(xiàn)冪等性的方法,確保多次接收到相同的請(qǐng)求時(shí)能正確處理,而不會(huì)重復(fù)執(zhí)行對(duì)系統(tǒng)狀態(tài)有改變的操作。
3.1. 冪等與冪等接口
冪等是指對(duì)同一個(gè)操作的多次執(zhí)行所產(chǎn)生的影響與一次執(zhí)行的影響相同,即不會(huì)因?yàn)槎啻螆?zhí)行而產(chǎn)生額外的副作用。在分布式系統(tǒng)中,由于網(wǎng)絡(luò)等各種因素的存在,可能會(huì)導(dǎo)致對(duì)同一個(gè)操作進(jìn)行多次執(zhí)行,此時(shí)如果該操作是冪等的,那么就可以避免數(shù)據(jù)沖突和重復(fù)處理問題。
冪等接口是指對(duì)外提供的接口,它們所提供的業(yè)務(wù)操作具有冪等性。也就是說,在客戶端多次調(diào)用該接口時(shí),每次調(diào)用都會(huì)產(chǎn)生相同的結(jié)果,并且不會(huì)產(chǎn)生額外的副作用。
例如,銀行轉(zhuǎn)賬操作就應(yīng)該是一個(gè)冪等接口。假設(shè)客戶端已經(jīng)成功地向銀行發(fā)起了一次轉(zhuǎn)賬請(qǐng)求,但由于網(wǎng)絡(luò)不穩(wěn)定等原因,導(dǎo)致該請(qǐng)求的響應(yīng)沒有及時(shí)返回給客戶端。此時(shí)客戶端可能會(huì)誤以為轉(zhuǎn)賬請(qǐng)求失敗,進(jìn)而再次發(fā)送同樣的轉(zhuǎn)賬請(qǐng)求。如果銀行的轉(zhuǎn)賬接口是冪等的,那么無論客戶端發(fā)送多少次轉(zhuǎn)賬請(qǐng)求,最終的結(jié)果都應(yīng)該是相同的——只有一次轉(zhuǎn)賬操作會(huì)被執(zhí)行,其他的請(qǐng)求都會(huì)被忽略。
3.2. 天然冪接口
有些接口天然就具備冪等性。這些接口通常是對(duì)資源的查詢操作,不會(huì)對(duì)資源進(jìn)行修改。以下是一些天然具備冪等性的場(chǎng)景:
- 查詢接口:對(duì)于只用于查詢數(shù)據(jù)的接口,例如獲取用戶信息、獲取訂單詳情等,多次調(diào)用不會(huì)對(duì)數(shù)據(jù)產(chǎn)生影響;
- 獲取接口:對(duì)于獲取資源的接口,例如獲取圖片、獲取文件等,無論調(diào)用多少次,得到的都是相同的資源副本;
- 計(jì)算接口:對(duì)于純計(jì)算的接口,例如加法、乘法等數(shù)學(xué)運(yùn)算接口,多次調(diào)用得到的結(jié)果是相同的;
- 驗(yàn)證接口:對(duì)于驗(yàn)證某個(gè)條件的接口,例如檢查用戶名是否已存在、驗(yàn)證手機(jī)號(hào)是否有效等,多次調(diào)用得到的驗(yàn)證結(jié)果都是一致的;
這些接口在設(shè)計(jì)上更容易滿足冪等性的要求,因?yàn)樗鼈兊牟僮鞅旧聿]有產(chǎn)生副作用,不會(huì)對(duì)數(shù)據(jù)進(jìn)行修改。在分布式系統(tǒng)中,可以安全地多次調(diào)用這些接口,而不會(huì)引發(fā)數(shù)據(jù)沖突。
3.3. 非冪等接口
非冪等接口是指對(duì)資源進(jìn)行修改、狀態(tài)進(jìn)行變更而產(chǎn)生副作用的接口,多次調(diào)用可能會(huì)導(dǎo)致不同的結(jié)果。以下是一些常見的非冪等接口:
- 創(chuàng)建資源接口:例如創(chuàng)建用戶、創(chuàng)建訂單等操作,多次調(diào)用會(huì)生成多個(gè)相同的資源實(shí)例,每次調(diào)用都會(huì)產(chǎn)生不同的結(jié)果;
- 修改資源接口:例如更新用戶信息、修改文章內(nèi)容等操作,多次調(diào)用會(huì)對(duì)同一個(gè)資源進(jìn)行多次修改,每次修改都會(huì)產(chǎn)生不同的結(jié)果;
- 刪除資源接口:例如刪除文件、刪除訂單等操作,多次調(diào)用會(huì)多次刪除同一個(gè)資源,每次調(diào)用都會(huì)產(chǎn)生不同的結(jié)果;
這些非冪等接口在設(shè)計(jì)上需要特別注意,并采取合適的措施來確保數(shù)據(jù)的一致性和操作的正確性。
3.4. 重復(fù)請(qǐng)求的處理方式
當(dāng)系統(tǒng)檢測(cè)出當(dāng)前請(qǐng)求是重復(fù)請(qǐng)求時(shí),通常會(huì)有兩種處理策略:
- 直接拋出冪等異常,用以說明該請(qǐng)求為重復(fù)請(qǐng)求;
- 直接返回上次請(qǐng)求一致的返回結(jié)果;
這兩種方案在實(shí)現(xiàn)上差異不大,但在客戶端使用中差異巨大。
- 如果是異常方案,客戶端在調(diào)用冪等接口時(shí)需要對(duì)異常進(jìn)行捕獲,然后通過其他 API 獲取上次請(qǐng)求的處理結(jié)果,根據(jù)結(jié)果不同來決定接下來的處理流程;
- 如果是直接返回上次請(qǐng)求的處理結(jié)果,則客戶端不需要做額外的處理,直接使用重試機(jī)制對(duì)請(qǐng)求進(jìn)行重新發(fā)送即可,獲取結(jié)果后自然進(jìn)入到下面的流程;
所以,在冪等設(shè)計(jì)中,優(yōu)先使用 “直接返回上次請(qǐng)求結(jié)果” 方案。
綜上分析,冪等可以做為一種通用能力與業(yè)務(wù)處理邏輯進(jìn)行充分解構(gòu),這就是組件封裝的前提。
3.5. 冪等組件
我們可以將冪等封裝成一種能力,然后在需要冪等保護(hù)的業(yè)務(wù)方法上進(jìn)行配置,從而實(shí)現(xiàn)冪等能力與業(yè)務(wù)方法的徹底解耦。
整體設(shè)計(jì)如下圖所示:
圖片
整體設(shè)計(jì)比較簡(jiǎn)單,運(yùn)行流程如下:
- IdempotentInterceptor 會(huì)對(duì) @Idempotent 注解標(biāo)記的方法進(jìn)行攔截;
- 當(dāng)方法第一次被調(diào)用時(shí),會(huì)讀取 @Idempotent 注解上的配置信息,使用 IdempotentExecutorFactoris 為每個(gè)方法創(chuàng)建一個(gè) IdempotentExecutor 實(shí)例;
- 在方法調(diào)用時(shí),將請(qǐng)求直接路由到 IdempotentExecutor 實(shí)例,由 IdempotentExecutor 完成核心流程;
- 其中,IdempotentExecutorFactories 擁有多個(gè) IdempotentExecutorFactory 實(shí)例,并根據(jù) @Idempotent 上配置的 executorFactory 屬性使用對(duì)應(yīng)的實(shí)例完成創(chuàng)建工作;
從設(shè)計(jì)上看,系統(tǒng)中可以同時(shí)配置多個(gè) IdempotentExecutorFactory,然后根據(jù)不同的業(yè)務(wù)場(chǎng)景設(shè)置不同的 executorFactory。
冪等處理的核心流程如下:
圖片
IdempotentExecutor 處理核心流程如下:
- 通過 SpringEL 表達(dá)式從入?yún)⒅刑崛?unique key 信息;
- 根據(jù) group 和 unique key 從 ExecutionRecordRepository 中讀取執(zhí)行記錄 ExecutionRecord;
- 如果 ExecutionRecord 為已完成狀態(tài),則根據(jù)配置直接返回 ExecutionRecord 的執(zhí)行結(jié)果 或者 直接拋出 RepeatedSubmitException 異常;
- 如果 ExecutionRecord 為執(zhí)行中,則出現(xiàn)并發(fā)問題,直接拋出 ConcurrentRequestException 異常;
- 如果 ExecutionRecord 為未執(zhí)行,先執(zhí)行方法獲取返回值,然后使用 ExecutionRecordRepository 對(duì) ExecutionRecord 進(jìn)行更新,然后返回執(zhí)行結(jié)果;
使用前需要引入 lego starter,在 maven pom 中添加如下信息:
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>最新版本</version>3.5.1. 配置 dbExecutorFactory
以 JpaRepository 為例實(shí)現(xiàn)對(duì) IdempotentExecutorFactory 的配置,具體如下:
@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {
@Bean("dbExecutorFactory")
public IdempotentExecutorFactory dbExecutorFactory(JpaBasedExecutionRecordRepository recordRepository){
return createExecutorFactory(recordRepository);
}
}其中,IdempotentConfigurationSupport 已經(jīng)提供 idempotent 所需的很多 Bean,同時(shí)提供 createExecutorFactory(repository) 方法,用以完成 IdempotentExecutorFactory 的創(chuàng)建。
使用 Jpa 需要調(diào)整 EnableJpaRepositories 相關(guān)配置,具體如下:
@Configuration
@EnableJpaRepositories(basePackages = {
"com.geekhalo.lego.core.idempotent.support.repository"
}, repositoryFactoryBeanClass = JpaBasedQueryObjectRepositoryFactoryBean.class)
public class SpringDataJpaConfiguration {
}其中,com.geekhalo.lego.core.idempotent.support.repository 為固定包名,指向 Jpa 默認(rèn)實(shí)現(xiàn) JpaBasedExecutionRecordRepository,Spring Data Jpa 會(huì)自動(dòng)生成實(shí)現(xiàn)的代理對(duì)象。
最后,在數(shù)據(jù)庫中增加 冪等所需表,sql 如下:
CREATE TABLE `idempotent_execution_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`type` int(11) NOT NULL,
`unique_key` varchar(64) NOT NULL,
`status` int(11) NOT NULL,
`result` varchar(1024) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unq_type_key` (`type`,`unique_key`)
) ENGINE=InnoDB;至此,便完成了基本配置。
【注】關(guān)于 Spring data jpa 配置,可以自行到網(wǎng)上進(jìn)行檢索。
3.5.2. 冪等保護(hù)示例
在方法上增加 @Idempotent 注解便可以使其具備冪等保護(hù),示例如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putForResult(String key, Long data){
return put(key, data);
}其中 @Idempotent 為核心配置,詳細(xì)信息如下:
- executorFactory 為 IdempotentExecutorFactory,及在 IdempotentConfiguration 中配置的bean,默認(rèn)為 DEFAULT_EXECUTOR_FACTORY
- group 為組信息,用于區(qū)分不同的業(yè)務(wù)場(chǎng)景,同一業(yè)務(wù)場(chǎng)景使用相同的配置;
- keyEl 為提取冪等鍵所用的 SpringEl 表達(dá)式,#key 說明入?yún)⒌?key 將作為冪等鍵,group + key 為一個(gè)完整的冪等鍵,唯一識(shí)別一次請(qǐng)求;
- handleType 是處理類型,及重復(fù)提交時(shí)如何處理請(qǐng)求
RESULT,直接返回上次的執(zhí)行結(jié)果
ERROR,直接拋出 RepeatedSubmitException 異常
編寫簡(jiǎn)單的測(cè)試用例如下:
@Test
void putForResult() {
BaseIdempotentService idempotentService = getIdempotentService();
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
{ // 第一次操作,返回值和最終結(jié)果符合預(yù)期
Long result = idempotentService.putForResult(key, value);
Assertions.assertEquals(value, result);
Assertions.assertEquals(value, idempotentService.getValue(key));
}
{ // 第二次操作,返回值和最終結(jié)果 與第一次一致(直接獲取返回值,沒有執(zhí)行業(yè)務(wù)邏輯)
Long valueNew = RandomUtils.nextLong();
Long result = idempotentService.putForResult(key, valueNew);
Assertions.assertEquals(value, result);
Assertions.assertEquals(value, idempotentService.getValue(key));
}
}運(yùn)行測(cè)試用例,測(cè)試通過,可得出如下結(jié)論:
- 第一次操作,與正常方法一致,成功返回結(jié)果值;
- 第二次操作,邏輯方法未執(zhí)行,直接返回第一次的運(yùn)行結(jié)果;
這是最常見的一種工作模式,除直接返回上次執(zhí)行結(jié)果外,當(dāng)發(fā)生重復(fù)提交時(shí)也可以拋出異常中斷流程,只需將 handleType 設(shè)置為 ERROR 即可,具體如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.ERROR)
@Transactional
public Long putForError(String key, Long data){
return put(key, data);
}編寫測(cè)試用例,具體如下:
@Test
void putForError() {
BaseIdempotentService idempotentService = getIdempotentService();
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
{ // 第一次操作,返回值和最終結(jié)果符合預(yù)期
Long result = idempotentService.putForError(key, value);
Assertions.assertEquals(value, result);
Assertions.assertEquals(value, idempotentService.getValue(key));
}
{ // 第二次操作,直接拋出異常,結(jié)果與第一次一致
Assertions.assertThrows(RepeatedSubmitException.class, () ->{
Long valueNew = RandomUtils.nextLong();
idempotentService.putForError(key, valueNew);
});
Assertions.assertEquals(value, idempotentService.getValue(key));
}
}運(yùn)行測(cè)試用例,測(cè)試通過,可以得出:
- 第一次操作,與正常方法一致,成功返回結(jié)果值;
- 第二次操作,直接拋出 RepeatedSubmitException 異常,同時(shí)方法未執(zhí)行,結(jié)果與第一次調(diào)用一致;
3.5.3. 冪等與異常
異常是一種特殊的返回值!!!
如果將異常看做是一種特殊的返回值,那冪等接口在第二次請(qǐng)求時(shí)同樣需要拋出異常,示例代碼如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putExceptionForResult(String key, Long data) {
return putException(key, data);
}
protected Long putException(String key, Long data){
this.data.put(key, data);
throw new IdempotentTestException();
}@Idempotent 注解沒有變化,只是在 putException 方法執(zhí)行后拋出 IdempotentTestException 異常。
編寫簡(jiǎn)單測(cè)試用例如下:
@Test
void putExceptionForResult(){
BaseIdempotentService idempotentService = getIdempotentService();
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
{ // 第一次操作,拋出異常
Assertions.assertThrows(IdempotentTestException.class,
()->idempotentService.putExceptionForResult(key, value));
Assertions.assertEquals(value, idempotentService.getValue(key));
}
{ // 第二次操作,返回值和最終結(jié)果 與第一一致(直接獲取返回值,沒有執(zhí)行業(yè)務(wù)邏輯)
Long valueNew = RandomUtils.nextLong();
Assertions.assertThrows(IdempotentTestException.class,
()->idempotentService.putExceptionForResult(key, valueNew));
Assertions.assertEquals(value, idempotentService.getValue(key));
}
}運(yùn)行測(cè)試用例,用例通過,可知:
- 第一次操作,與方法邏輯一致,更新數(shù)據(jù)并拋出 IdempotentTestException 異常;
- 第二次操作,直接拋出 IdempotentTestException 異常,同時(shí)方法未執(zhí)行,結(jié)果與第一次一致;
3.5.4. 并發(fā)保護(hù)
如果上一個(gè)請(qǐng)求執(zhí)行尚未結(jié)束,新的請(qǐng)求已經(jīng)開啟,那會(huì)如何?
這就是最常見的并發(fā)場(chǎng)景,idempotent 對(duì)其也進(jìn)行了支持,當(dāng)出現(xiàn)并發(fā)請(qǐng)求時(shí)會(huì)直接拋出 ConcurrentRequestException,用于中斷處理。
首先,使用 sleep 模擬一個(gè)耗時(shí)的方法,具體如下:
@Idempotent(executorFactory = "dbExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Transactional
public Long putWaitForResult(String key, Long data) {
return putForWait(key, data);
}
protected Long putForWait(String key, Long data){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return put(key, data);
}putWaitForResult 方法調(diào)用時(shí)會(huì)主動(dòng) sleep 3 秒,然后才執(zhí)行真正的邏輯。
編寫測(cè)試代碼如下:
@Test
void putWaitForResult(){
String key = String.valueOf(RandomUtils.nextLong());
Long value = RandomUtils.nextLong();
// 主線程拋出 ConcurrentRequestException
Assertions.assertThrows(ConcurrentRequestException.class, () ->
testForConcurrent(baseIdempotentService ->
baseIdempotentService.putWaitForResult(key, value))
);
}
private void testForConcurrent(Consumer<BaseIdempotentService> consumer) throws InterruptedException {
// 啟動(dòng)一個(gè)線程執(zhí)行任務(wù),模擬并發(fā)場(chǎng)景
Thread thread = new Thread(() -> consumer.accept(getIdempotentService()));
thread.start();
// 主線程 sleep 1 秒,與異步線程并行執(zhí)行任務(wù)
TimeUnit.SECONDS.sleep(1);
consumer.accept(getIdempotentService());
}運(yùn)行單元測(cè)試,測(cè)試通過,核心測(cè)試邏輯如下:
- 創(chuàng)建一個(gè)線程,執(zhí)行耗時(shí)方法;
- 等待 1 秒后,主線程也執(zhí)行耗時(shí)方法;
- 此時(shí),兩個(gè)線程并發(fā)執(zhí)行耗時(shí)方法,后進(jìn)入的主線程直接拋出 ConcurrentRequestException;
3.5.5. Redis 支持
DB 具有非常好的一致性,但性能存在一定的問題。在一致性要求不高,性能要求高的場(chǎng)景,可以使用 Redis 作為 ExecutionRecord 的存儲(chǔ)引擎。
引入 redis 非常簡(jiǎn)單,大致分兩步:
- 在 IdempotentConfiguration 中注冊(cè) redisExecutorFactory bean;
- @Idempotent 注解中使用 redisExecutorFactory 即可;
添加 redisExecutorFactory Bean,具體如下:
@Configuration
public class IdempotentConfiguration extends IdempotentConfigurationSupport {
@Bean("redisExecutorFactory")
public IdempotentExecutorFactory redisExecutorFactory(ExecutionRecordRepository executionRecordRepository){
return createExecutorFactory(executionRecordRepository);
}
@Bean
public ExecutionRecordRepository executionRecordRepository(RedisTemplate<String, ExecutionRecord> recordRedisTemplate){
return new RedisBasedExecutionRecordRepository("ide-%s-%s", Duration.ofDays(7), recordRedisTemplate);
}
@Bean
public RedisTemplate<String, ExecutionRecord> recordRedisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String, ExecutionRecord> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
Jackson2JsonRedisSerializer<ExecutionRecord> executionRecordJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(ExecutionRecord.class);
executionRecordJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
redisTemplate.setValueSerializer(executionRecordJackson2JsonRedisSerializer);
return redisTemplate;
}
}@Idempotent 注解調(diào)整如下:
@Idempotent(executorFactory = "redisExecutorFactory", group = 1, keyEl = "#key",
handleType = IdempotentHandleType.RESULT)
@Override
public Long putForResult(String key, Long data){
return put(key, data);
}這樣,所有的冪等信息都會(huì)存儲(chǔ)在 redis 中。
【注】一般 redis 不會(huì)對(duì)數(shù)據(jù)進(jìn)行持久存儲(chǔ),只能保障在一段時(shí)間內(nèi)的冪等性,超出時(shí)間后,由于 key 被自動(dòng)清理,冪等將不再生效。對(duì)于業(yè)務(wù)場(chǎng)景不太嚴(yán)格但性能要求較高的場(chǎng)景才可使用,比如為過濾系統(tǒng)中由于 retry 機(jī)制造成的重復(fù)請(qǐng)求。
4. 小結(jié)
當(dāng)系統(tǒng)開啟微服務(wù)化后,服務(wù)調(diào)用的第三狀態(tài)就成為了不可回避的話題。通常情況下,會(huì)綜合使用 客戶端重試 和 服務(wù)端冪等 兩個(gè)方案來解決:
- 客戶端重試。需要根據(jù)不同的場(chǎng)景選擇不同的 Retry 和 Fallback 機(jī)制:
寫請(qǐng)求,建設(shè)使用 Retry 機(jī)制,保障最重要的流量不被浪費(fèi)
讀請(qǐng)求,建議使用 Fallback 機(jī)制,避免重試流量對(duì)下游服務(wù)造成巨大壓力
- 服務(wù)端冪等。冪等作為一種通用能力,建議與業(yè)務(wù)邏輯進(jìn)行分離,在需要的時(shí)候直接在業(yè)務(wù)方法上進(jìn)行配置即可,但仍舊有一些最佳實(shí)踐:
- 使用直接返回上次的處理結(jié)果替代異常中斷,使調(diào)用方更加簡(jiǎn)潔;
- 業(yè)務(wù)異常是一種特殊的返回值,也需要進(jìn)行冪等保護(hù);
- 有冪等保護(hù)后,仍舊需要對(duì)并發(fā)請(qǐng)求進(jìn)行處理,此時(shí)直接通過異常對(duì)重復(fù)流程進(jìn)行中斷即可;
以上兩種場(chǎng)景,lego 對(duì)其都進(jìn)行了封裝,可以方便的應(yīng)用于業(yè)務(wù)系統(tǒng),從而降低微服務(wù)的傷害。























