基于 CompletionService 實(shí)現(xiàn)高效并發(fā):一成功即返回的策略
前言
在并發(fā)編程中,我們經(jīng)常會(huì)遇到這樣的場(chǎng)景:需要同時(shí)調(diào)用多個(gè)服務(wù)或方法,只要其中一個(gè)成功就立即返回成功結(jié)果,只有當(dāng)所有調(diào)用都失敗時(shí)才返回失敗。這種場(chǎng)景在服務(wù)容錯(cuò)、多源數(shù)據(jù)獲取等領(lǐng)域非常常見(jiàn)。
什么是 CompletionService
圖片
CompletionService是Java并發(fā)包中的一個(gè)接口,它結(jié)合了Executor和BlockingQueue的功能,能夠更方便地管理異步任務(wù)的執(zhí)行和結(jié)果獲取。其主要優(yōu)勢(shì)在于:
- 可以按照任務(wù)完成的順序獲取結(jié)果,而不是提交順序
- 提供了超時(shí)獲取結(jié)果的能力
- 簡(jiǎn)化了并發(fā)任務(wù)的管理流程
CompletionService的核心實(shí)現(xiàn)類是ExecutorCompletionService,它需要一個(gè)Executor來(lái)執(zhí)行任務(wù),本質(zhì)上是對(duì)Executor的一種裝飾。
CompletionService的實(shí)現(xiàn)目標(biāo)是任務(wù)先完成可優(yōu)先獲取到,即結(jié)果按照完成先后順序排序。
與CompletableFuture區(qū)別
CompletionService
- 定位:
Java 5引入,是對(duì)Executor和BlockingQueue的組合封裝,核心解決按任務(wù)完成順序獲取結(jié)果的問(wèn)題。 - 設(shè)計(jì)理念:簡(jiǎn)化多個(gè)異步任務(wù)的結(jié)果收集流程,避免按提交順序等待任務(wù)完成(傳統(tǒng)
Future集合需要逐個(gè)檢查是否完成,效率低)。 - 本質(zhì):通過(guò)內(nèi)部維護(hù)一個(gè)阻塞隊(duì)列,當(dāng)任務(wù)完成時(shí)自動(dòng)將結(jié)果放入隊(duì)列,用戶只需從隊(duì)列中獲取即可,無(wú)需關(guān)心任務(wù)提交順序。
CompletableFuture
- 定位:
Java 8引入,是Future接口的增強(qiáng)實(shí)現(xiàn),支持異步任務(wù)的鏈?zhǔn)秸{(diào)用、組合、依賴管理等復(fù)雜操作。 - 設(shè)計(jì)理念:基于函數(shù)式編程思想,提供非阻塞的回調(diào)機(jī)制,允許將多個(gè)異步任務(wù)串聯(lián)或并聯(lián)成一個(gè)完整的流程,解決 “回調(diào)地獄” 問(wèn)題。
- 本質(zhì):不僅能獲取任務(wù)結(jié)果,還能描述任務(wù)之間的依賴關(guān)系(如任務(wù)
A完成后執(zhí)行任務(wù)B任務(wù)A和B都完成后執(zhí)行任務(wù)C)。
代碼案例
并發(fā)調(diào)用3個(gè)接口,按接口返回順序處理結(jié)果(不關(guān)心提交順序)
public class CompletionServiceDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 創(chuàng)建線程池和CompletionService
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 提交3個(gè)任務(wù)(模擬不同接口調(diào)用,執(zhí)行時(shí)間不同)
completionService.submit(() -> {
Thread.sleep(3000); // 模擬耗時(shí)3秒
return"接口A結(jié)果";
});
completionService.submit(() -> {
Thread.sleep(1000); // 模擬耗時(shí)1秒
return"接口B結(jié)果";
});
completionService.submit(() -> {
Thread.sleep(2000); // 模擬耗時(shí)2秒
return"接口C結(jié)果";
});
// 按完成順序獲取結(jié)果(預(yù)期順序:B→C→A)
for (int i = 0; i < 3; i++) {
Future<String> future = completionService.take(); // 阻塞等待下一個(gè)完成的任務(wù)
System.out.println("處理結(jié)果:" + future.get());
}
executor.shutdown();
}
}先調(diào)用接口A,再用A的結(jié)果調(diào)用接口B,最后將B的結(jié)果與接口C的結(jié)果合并。
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 接口A:返回用戶ID
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
return"user123";
});
// 接口B:依賴A的結(jié)果,返回用戶信息
CompletableFuture<String> futureB = futureA.thenCompose(userId ->
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
return"用戶信息:" + userId;
})
);
// 接口C:獨(dú)立任務(wù),返回用戶積分
CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); }
return 1000;
});
// 合并B和C的結(jié)果
CompletableFuture<String> combined = futureB.thenCombine(futureC,
(userInfo, score) -> userInfo + ",積分:" + score
);
System.out.println(combined.get()); // 阻塞獲取最終結(jié)果
}
}實(shí)現(xiàn)思路
要實(shí)現(xiàn)只要有一個(gè)成功就立即返回,否則等所有失敗才返回的邏輯,我們可以采用以下策略:
- 創(chuàng)建一個(gè)線程池和對(duì)應(yīng)的
CompletionService - 向
CompletionService提交所有需要執(zhí)行的任務(wù) - 循環(huán)獲取已完成的任務(wù)結(jié)果
- 一旦發(fā)現(xiàn)有成功的結(jié)果,立即取消所有未完成的任務(wù)并返回成功
- 如果所有任務(wù)都執(zhí)行完畢且都失敗,則返回失敗
示例代碼
// 任務(wù)結(jié)果封裝類
class TaskResult {
private boolean success;
private String message;
private String taskName;
public TaskResult(boolean success, String message, String taskName) {
this.success = success;
this.message = message;
this.taskName = taskName;
}
public boolean isSuccess() {
return success;
}
public String getMessage() {
return message;
}
public String getTaskName() {
return taskName;
}
}
// 模擬業(yè)務(wù)任務(wù)
class BusinessTask implements Callable<TaskResult> {
private String taskName;
private int executionTime; // 執(zhí)行時(shí)間(毫秒)
private boolean shouldSucceed; // 是否應(yīng)該成功
public BusinessTask(String taskName, int executionTime, boolean shouldSucceed) {
this.taskName = taskName;
this.executionTime = executionTime;
this.shouldSucceed = shouldSucceed;
}
@Override
public TaskResult call() throws Exception {
System.out.println("任務(wù) " + taskName + " 開(kāi)始執(zhí)行");
// 模擬任務(wù)執(zhí)行時(shí)間
Thread.sleep(executionTime);
if (shouldSucceed) {
return new TaskResult(true, taskName + " 執(zhí)行成功", taskName);
} else {
throw new Exception(taskName + " 執(zhí)行失敗");
}
}
}
public class ConcurrentTaskDemo {
public static void main(String[] args) {
// 創(chuàng)建線程池
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<TaskResult> completionService = new ExecutorCompletionService<>(executor);
// 存儲(chǔ)所有提交的任務(wù),用于后續(xù)可能的取消操作
List<Future<TaskResult>> futures = new ArrayList<>();
try {
// 提交三個(gè)任務(wù)
futures.add(completionService.submit(new BusinessTask("支付接口A", 1000, false)));
futures.add(completionService.submit(new BusinessTask("支付接口B", 2000, true)));
futures.add(completionService.submit(new BusinessTask("支付接口C", 3000, false)));
// 等待任務(wù)完成,只要有一個(gè)成功就返回
TaskResult successResult = null;
int taskCount = futures.size();
for (int i = 0; i < taskCount; i++) {
try {
// 獲取已完成的任務(wù)結(jié)果,設(shè)置超時(shí)時(shí)間
Future<TaskResult> future = completionService.poll(5, TimeUnit.SECONDS);
if (future == null) {
System.out.println("獲取任務(wù)結(jié)果超時(shí)");
continue;
}
// 如果沒(méi)有異常拋出,說(shuō)明任務(wù)成功
TaskResult result = future.get();
successResult = result;
break; // 只要有一個(gè)成功就跳出循環(huán)
} catch (ExecutionException e) {
// 任務(wù)執(zhí)行失敗,繼續(xù)等待其他任務(wù)
System.out.println("任務(wù)執(zhí)行失敗: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.out.println("等待任務(wù)完成超時(shí)");
}
}
// 處理最終結(jié)果
if (successResult != null) {
System.out.println("整體執(zhí)行成功: " + successResult.getMessage());
// 取消其他可能還在執(zhí)行的任務(wù)
cancelRemainingTasks(futures);
} else {
System.out.println("所有任務(wù)都執(zhí)行失敗");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("線程被中斷: " + e.getMessage());
} finally {
// 關(guān)閉線程池
executor.shutdown();
}
}
// 取消所有未完成的任務(wù)
private static void cancelRemainingTasks(List<Future<TaskResult>> futures) {
for (Future<TaskResult> future : futures) {
if (!future.isDone()) {
future.cancel(true);
System.out.println("取消未完成的任務(wù)");
}
}
}
}關(guān)鍵技術(shù)點(diǎn)
CompletionService的使用:通過(guò)其poll方法可以按完成順序獲取任務(wù)結(jié)果,還可以設(shè)置超時(shí)時(shí)間。- 任務(wù)取消機(jī)制:當(dāng)一個(gè)任務(wù)成功后,我們需要取消其他未完成的任務(wù)以節(jié)省資源,這通過(guò)
Future的cancel方法實(shí)現(xiàn)。 - 異常處理:任務(wù)執(zhí)行失敗會(huì)拋出
ExecutionException,我們捕獲這個(gè)異常并繼續(xù)等待其他任務(wù)。 - 線程池管理:使用完線程池后必須關(guān)閉,以釋放資源。
注意事項(xiàng)
- 線程安全:確保提交給
CompletionService的任務(wù)是線程安全的,避免共享可變狀態(tài)。 - 資源釋放:無(wú)論任務(wù)執(zhí)行成功與否,都要確保線程池被正確關(guān)閉。
- 超時(shí)設(shè)置:合理設(shè)置
poll方法的超時(shí)時(shí)間,避免無(wú)限等待。 - 任務(wù)取消的局限性:對(duì)于已經(jīng)開(kāi)始執(zhí)行的任務(wù),
cancel (true)只能通過(guò)中斷來(lái)嘗試停止,如果任務(wù)不響應(yīng)中斷,可能無(wú)法真正取消。 - 線程池大小:根據(jù)任務(wù)特性合理設(shè)置線程池大小,過(guò)多的線程會(huì)消耗更多資源,過(guò)少則可能影響并發(fā)效率。





















