轉(zhuǎn)轉(zhuǎn)門(mén)店商詳頁(yè)異步編程的實(shí)踐

- 1 背景
- 2 CompletableFuture的發(fā)展
- 2.1 Future簡(jiǎn)介
- 2.2 Future的局限性
- 3 關(guān)于CompletableFuture
- 3.1 CompletableFuture提供的能力
- 3.2 示例場(chǎng)景的解決方案
- 3.3 實(shí)際應(yīng)用效果
- 4 總結(jié)
- 5 參考資料
1 背景
轉(zhuǎn)轉(zhuǎn)門(mén)店商詳頁(yè)為用戶在小程序、APP等終端提供下單、預(yù)約看機(jī)的能力。下圖為轉(zhuǎn)轉(zhuǎn)門(mén)店小程序的商詳頁(yè)。
圖片
起初業(yè)務(wù)體量小,QPS低,內(nèi)容簡(jiǎn)單,接口采用簡(jiǎn)單的串行實(shí)現(xiàn),但隨著門(mén)店業(yè)務(wù)的發(fā)展,頁(yè)面信息的豐富,QPS的提升,串行實(shí)現(xiàn)的缺點(diǎn)也顯露出來(lái):訪問(wèn)耗時(shí)高,代碼耦合嚴(yán)重。
假設(shè)門(mén)店商詳頁(yè)有以下幾個(gè)模塊,每個(gè)模塊需要如下時(shí)間才能完成:
- 獲取商品基礎(chǔ)信息 0.5s
- 獲取優(yōu)惠信息 1s
- 獲取門(mén)店信息 1s
- 獲取驗(yàn)機(jī)評(píng)估報(bào)告信息 1s
- 獲取標(biāo)品參數(shù)信息 1s
- 組裝返回商品信息 0.5s
串行執(zhí)行每個(gè)模塊,那么一共需要5s才能返回給調(diào)用方,如果RPC接口產(chǎn)生超時(shí)等,會(huì)比5s還要長(zhǎng),顯然是不能接受的。
圖片
如果有多個(gè)線程并行完成各個(gè)模塊,可能2s內(nèi)就能返回信息。
圖片
在JDK1.8中引入了CompletableFuture實(shí)現(xiàn)類,擴(kuò)展了Future和CompletionStage,實(shí)現(xiàn)異步多線程任務(wù)執(zhí)行。
本文主要以獲取門(mén)店商詳頁(yè)信息為例介紹CompletableFuture的常用方法和使用。
2 CompletableFuture的發(fā)展
常見(jiàn)的線程創(chuàng)建方式有兩種,一是直接繼承Thread,另一種是實(shí)現(xiàn)Runnable接口。但這兩種方式有個(gè)缺點(diǎn),不支持獲取線程執(zhí)行結(jié)果,所以在JDK1.5之后,提供了Callable和Future,可以在任務(wù)執(zhí)行后獲取執(zhí)行結(jié)果。
2.1 Future簡(jiǎn)介
Future類位于java.util.concurrent包下,從下面的源碼可以看出,F(xiàn)uture主要提供了三種能力:
- 關(guān)閉執(zhí)行中的任務(wù)
- 判斷任務(wù)是否執(zhí)行完成
- 獲取任務(wù)執(zhí)行的結(jié)果
package java.util.concurrent;
public interface Future<V> {
// 取消執(zhí)行中的任務(wù)
boolean cancel(boolean mayInterruptIfRunning);
// 判斷任務(wù)是否被取消成功
boolean isCancelled();
// 判斷任務(wù)是否執(zhí)行完成
boolean isDone();
// 獲取任務(wù)執(zhí)行結(jié)果
V get() throws InterruptedException, ExecutionException;
// 在規(guī)定時(shí)間內(nèi)獲取任務(wù)執(zhí)行結(jié)果,若規(guī)定時(shí)間任務(wù)還沒(méi)執(zhí)行完,則返回null,而非拋異常
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}2.2 Future的局限性
Future通過(guò)isDone()判斷任務(wù)是否執(zhí)行完成,get()獲取任務(wù)執(zhí)行的結(jié)果,解決了創(chuàng)建線程異步執(zhí)行任務(wù)不能獲取執(zhí)行結(jié)果的問(wèn)題,在任務(wù)異步執(zhí)行中,主線程在等待過(guò)程中可以做其他事,但其本身也存在一定的局限性。
- 并行執(zhí)行多任務(wù)獲取結(jié)果主線程長(zhǎng)時(shí)間阻塞:當(dāng)需要將多個(gè)模塊的任務(wù)異步執(zhí)行時(shí),使用for循環(huán)遍歷任務(wù)列表,通過(guò)
isDone()輪詢判斷任務(wù)是否執(zhí)行完成,通過(guò)get()方法獲取任務(wù)執(zhí)行結(jié)果,且結(jié)果獲取依賴任務(wù)執(zhí)行順序。但因Future的get()方法是主線程阻塞等待獲取執(zhí)行結(jié)果,所以在結(jié)果返回前,主線程不能處理其他任務(wù),長(zhǎng)時(shí)間阻塞,可能會(huì)產(chǎn)生block,在使用時(shí)考慮用超時(shí)時(shí)間的get()方法。 - 不能鏈?zhǔn)綀?zhí)行任務(wù):如上述場(chǎng)景,希望在獲取商品基礎(chǔ)信息后執(zhí)行獲取優(yōu)惠信息任務(wù),F(xiàn)uture沒(méi)有提供這種能力,不能實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用。
- 不能將多個(gè)任務(wù)執(zhí)行的結(jié)果組合:在上述場(chǎng)景中,希望獲取商詳所需的各個(gè)模塊信息后,組合成調(diào)用方需要的結(jié)果,但Future不支持。
- 不能處理異常:Future沒(méi)有異常處理的能力。
綜上所述,阻塞主線程獲取結(jié)果的方式與異步編程的初衷相違背,輪詢判斷任務(wù)是否執(zhí)行完成會(huì)耗費(fèi)不必要的CPU資源,為優(yōu)化上述問(wèn)題,在JDK1.8時(shí)引入了CompletableFuture實(shí)現(xiàn)類,提供異步鏈?zhǔn)骄幊痰哪芰Α?/p>
3 關(guān)于CompletableFuture
CompletableFuture擴(kuò)展了Future接口,實(shí)現(xiàn)了CompletionStage,提供了函數(shù)式編程的能力,通過(guò)回調(diào)的方式處理計(jì)算結(jié)果,并提供了轉(zhuǎn)換和組合CompletableFuture的方法,從而簡(jiǎn)化異步編程的復(fù)雜性。下圖為CompletableFuture類圖。
圖片
3.1 CompletableFuture提供的能力
CompletableFuture依然有Future的能力,可以通過(guò)get()獲取結(jié)果,但不推薦使用,那么CompletableFuture提供了哪些能力呢?
3.1.1 創(chuàng)建異步任務(wù)
CompletableFuture提供了四種創(chuàng)建異步對(duì)象的方法,如下:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)其中runAsync表示無(wú)返回值的異步對(duì)象,supplyAsync為有返回值的異步對(duì)象,仍然可以通過(guò)如果不指定線程池會(huì)默認(rèn)使用ForkJoinPool.commonPool(),建議使用自定義的線程池,和其他場(chǎng)景區(qū)分開(kāi),避免線程資源競(jìng)爭(zhēng),也易于監(jiān)控線程使用情況。
在上述獲取商品詳情頁(yè)信息的示例中,想獲取驗(yàn)機(jī)評(píng)估報(bào)告信息,可以使用無(wú)返回值的runAsync創(chuàng)建對(duì)象,使用自定義的線程池testThreadPool,結(jié)果集使用自定義的對(duì)象ResultDTO接收,里面定義各個(gè)模塊結(jié)果的成員變量,如果使用Map接收結(jié)果,需要注意線程安全問(wèn)題。
ExecutorService testThreadPool = Executors.newFixedThreadPool(10);
ResultDTO resultDTO = new ResultDTO();
CompletableFuture<Void> baseInfoFuture = CompletableFuture.runAsync(() -> {
System.out.println("獲取商品驗(yàn)機(jī)評(píng)估報(bào)告任務(wù)當(dāng)前線程" + Thread.currentThread().getId());
//獲取報(bào)告rpc接口邏輯
resultMap.put("baseInfo", "");
}, testThreadPool).exceptionally(e -> {
System.out.println(e.getMessage());
return null;
});3.1.2 異步回調(diào)
CompletableFuture可以在任務(wù)處理結(jié)束異步回調(diào),主要提供了以下回調(diào)方法:
//當(dāng)上一個(gè)任務(wù)執(zhí)行結(jié)束時(shí),回調(diào)方法接收上一段任務(wù)執(zhí)行結(jié)果或異常,返回上一段任務(wù)結(jié)果
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
//當(dāng)上一個(gè)任務(wù)執(zhí)行結(jié)束時(shí),回調(diào)方法接收上一段任務(wù)執(zhí)行結(jié)果或異常,返回回調(diào)方法的結(jié)果
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
//異步回調(diào)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
//當(dāng)上一個(gè)任務(wù)執(zhí)行出現(xiàn)異常時(shí),返回回調(diào)中的結(jié)果
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)3.1.3 異步編排
CompletionStage接口定義了異步編排的各種場(chǎng)景,CompletableFuture實(shí)現(xiàn)了該接口,每個(gè)CompletionStage表示異步編排的某個(gè)階段,任務(wù)執(zhí)行可能在上一個(gè)階段完成時(shí)觸發(fā),也可能后面幾個(gè)階段全部完成或某個(gè)階段完成時(shí)觸發(fā)。
原理大致是每個(gè)新建的CompletableFuture對(duì)象,會(huì)將當(dāng)前方法需要執(zhí)行的任務(wù)封裝成Completion對(duì)象,將其壓入到上個(gè)方法中創(chuàng)建的異步任務(wù)棧中,Completion為一個(gè)鏈表結(jié)構(gòu),保存依賴的任務(wù),CAS出鏈表后,tryFire()執(zhí)行依賴任務(wù),從而實(shí)現(xiàn)任務(wù)串聯(lián)。
比較常用的場(chǎng)景有在上一階段完成時(shí)觸發(fā)的串行場(chǎng)景,多個(gè)階段都執(zhí)行完成觸發(fā)的AND,或者多個(gè)階段中某一階段執(zhí)行完觸發(fā)的OR。下面為方法示例。
//異步接收上一階段任務(wù)結(jié)果,且有返回值
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
//異步接收上一階段任務(wù)結(jié)果,但無(wú)返回值
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
//等待全部異步任務(wù)執(zhí)行結(jié)束
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
//某個(gè)任務(wù)執(zhí)行結(jié)束即返回
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)在上述獲取商詳單示例中,可以使用thenAcceptAsync在獲取商品基礎(chǔ)信息后異步獲取優(yōu)惠信息,如下代碼。
CompletableFuture<Void> productBaseInfoFuture = CompletableFuture.runAsync(() -> {
ProductInfoDTO base = new ProductInfoDTO();
BaseInfoDTO baseInfoDTO = rpcxx;
resultDTO.setBaseInfoDTO(baseInfoDTO);
}, testThreadPool).thenAcceptAsync(() -> {
CouponInfoDTO couponInfoDTO = rpcxx;
resultDTO.setCouponInfoDTO(couponInfoDTO);
}, testThreadPool);返回結(jié)果:
圖片
3.2 示例場(chǎng)景的解決方案
通過(guò)上述分析,可以將獲取商品詳情頁(yè)的步驟分為三步,分別為:
- 獲取商品基礎(chǔ)信息、商品驗(yàn)機(jī)評(píng)估報(bào)告信息、商品標(biāo)品參數(shù)信息、門(mén)店信息;
- 獲取商品優(yōu)惠信息,需要等1結(jié)束;
- 將上述信息RPC結(jié)果組裝返回,需要等1,2結(jié)束。
因?yàn)槎嗳蝿?wù)異步并行執(zhí)行,最終耗時(shí)將取決于耗時(shí)最長(zhǎng)的鏈路。如下圖所示:
圖片
代碼示例:
ExecutorService testThreadPool = Executors.newFixedThreadPool(10);
ResultDTO resultDTO = new ResultDTO();
//基礎(chǔ)信息
CompletableFuture<Void> productBaseInfoFuture = CompletableFuture.runAsync(() -> {
BaseInfoDTO baseInfoDTO = rpcxx;
resultDTO.setBaseInfoDTO(baseInfoDTO);
}, testThreadPool);
//優(yōu)惠信息
CompletableFuture<Void> couponInfoFuture = productBaseInfoFuture.thenAcceptAsync(() -> {
CouponInfoDTO couponInfoDTO = rpcxx;
resultDTO.setCouponInfoDTO(couponInfoDTO);
}, testThreadPool);
//驗(yàn)機(jī)評(píng)估報(bào)告信息
CompletableFuture<Void> qcInfoFuture = CompletableFuture.runAsync(() -> {
QcInfoDTO qcInfoDTO = rpcxx;
resultDTO.setQcInfoDTO(qcInfoDTO);
}, testThreadPool);
//門(mén)店信息
CompletableFuture<Void> storeInfoFuture = CompletableFuture.runAsync(() -> {
StoreInfoDTO storeInfoDTO = rpcxx;
resultDTO.setStoreInfoDTO(storeInfoDTO);
}, testThreadPool);
//標(biāo)品參數(shù)信息
CompletableFuture<Void> spuInfoFuture = CompletableFuture.runAsync(() -> {
SpuInfoDTO spuInfoDTO = rpcxx;
resultDTO.setSpuInfoDTO(spuInfoDTO);
}, testThreadPool);
//組裝結(jié)果
CompletableFuture<Void> allQuery = CompletableFuture.allOf(couponInfoFuture, qcInfoFuture, storeInfoFuture, spuInfoFuture);
CompletableFuture<Void> buildFuture = allQuery.thenAcceptAsync((result) -> {
//組裝邏輯
return null;
}).join();以上即為獲取門(mén)店商品詳情頁(yè)異步編排的實(shí)現(xiàn)邏輯,但也發(fā)現(xiàn),該方案創(chuàng)建多個(gè)異步任務(wù),執(zhí)行邏輯不一樣但流程大致相同,類似的代碼重復(fù)寫(xiě)多遍,不便于擴(kuò)展和閱讀。
在此基礎(chǔ)上可以優(yōu)化為使用CompletableFuture+簡(jiǎn)單工廠+策略模式,將上述步驟中的每個(gè)模塊都作為策略handler,且策略之間有權(quán)重依賴關(guān)系,模塊類型作為工廠類型,將模塊類型放進(jìn)列表中,使用CompletableFuture.allOf()異步執(zhí)行列表中的任務(wù)。
偽代碼如下:
List<String> eventList = Arrays.asList("xx", "xxx");
CompletableFuture.allOf(eventList.stream().map(event ->
CompletableFuture.runAsync(() -> {
//通過(guò)工廠類型獲取策略實(shí)現(xiàn)handler
if (Objects.nonNull(handler)) {
//如果存在則執(zhí)行
}
}, testThreadPool)).toArray(CompletableFuture[]::new)).join();3.3 實(shí)際應(yīng)用效果
最終轉(zhuǎn)轉(zhuǎn)門(mén)店商詳頁(yè)接口重構(gòu)采用CompletableFuture+簡(jiǎn)單工廠+策略模式的方案,解耦堆疊的代碼塊,使功能更易擴(kuò)展,可閱讀性更高,在重構(gòu)代碼上線后,效果明顯。
4 總結(jié)
以上就是關(guān)于使用CompletableFuture進(jìn)行異步編程的全部?jī)?nèi)容,在此基礎(chǔ)上我們還要注意以下幾點(diǎn):
- 當(dāng)有多個(gè)任務(wù)可以異步并行執(zhí)行時(shí),使用CompletableFuture,任務(wù)越多效果越明顯;
- 使用CompletableFuture可以將多個(gè)任務(wù)串聯(lián)執(zhí)行,也可以利用組合方式將任務(wù)排列由列表變成樹(shù)結(jié)構(gòu);
- 在使用集合接收多線程處理任務(wù)的結(jié)果時(shí),需要考慮線程安全問(wèn)題;
- 當(dāng)任務(wù)執(zhí)行有相互依賴關(guān)系時(shí),需考慮任務(wù)超時(shí)主動(dòng)結(jié)束,避免系統(tǒng)block。




































