CompletableFuture的五大坑!
前言
CompletableFuture在并發編程中非常實用,但如果用不好,也很容易踩坑。
今天這篇文章跟大家一起聊聊,CompletableFuture在使用過程中最常見的那些坑,希望對你會有所幫助。
一、CompletableFuture簡介
有些小伙伴在工作中剛開始接觸CompletableFuture時,可能會被它強大的功能所吸引。
確實,CompletableFuture為我們提供了非常優雅的異步編程方式,但正如武俠小說中的神兵利器,如果使用不當,反而會傷到自己。
CompletableFuture的基本用法
先來看一個簡單的CompletableFuture使用示例:
public class BasicCompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 簡單的異步計算
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模擬耗時操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return"Hello, CompletableFuture!";
});
// 獲取結果(阻塞)
String result = future.get();
System.out.println(result);
}
}看起來很簡單對吧?但正是這種表面上的簡單,掩蓋了很多潛在的復雜性。
讓我們通過一個架構圖來理解CompletableFuture的完整生態:
圖片
現在,讓我們開始深入探討各個坑點。
二、線程池使用不當
有些小伙伴在使用CompletableFuture時,往往忽略了線程池的配置,這可能是最容易被忽視但影響最大的坑。
默認線程池的陷阱
public class ThreadPoolPitfall {
// 危險的用法:大量使用默認線程池
public void processBatchData(List<String> dataList) {
List<CompletableFuture<String>> futures = new ArrayList<>();
for (String data : dataList) {
// 使用默認的ForkJoinPool.commonPool()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return processData(data);
});
futures.add(future);
}
// 等待所有任務完成
CompletableFuture.allOf(fatures.toArray(new CompletableFuture[0]))
.join();
}
private String processData(String data) {
// 模擬數據處理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return data.toUpperCase();
}
}問題分析:
- 默認線程池大小是CPU核心數-1
- 在IO密集型任務中,這會導致大量任務排隊等待
- 如果任務提交速度 > 任務處理速度,會造成內存溢出
正確的線程池使用方式
public class ProperThreadPoolUsage {
privatefinal ExecutorService ioBoundExecutor;
privatefinal ExecutorService cpuBoundExecutor;
public ProperThreadPoolUsage() {
// IO密集型任務 - 使用較大的線程池
this.ioBoundExecutor = new ThreadPoolExecutor(
50, // 核心線程數
100, // 最大線程數
60L, TimeUnit.SECONDS, // 空閑線程存活時間
new LinkedBlockingQueue<>(1000), // 工作隊列
new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
// CPU密集型任務 - 使用較小的線程池
this.cpuBoundExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // CPU核心數
Runtime.getRuntime().availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("cpu-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
public CompletableFuture<String> processWithProperPool(String data) {
return CompletableFuture.supplyAsync(() -> {
// IO操作,使用IO線程池
return fetchFromDatabase(data);
}, ioBoundExecutor);
}
public CompletableFuture<String> computeWithProperPool(String data) {
return CompletableFuture.supplyAsync(() -> {
// CPU密集型計算,使用CPU線程池
return heavyComputation(data);
}, cpuBoundExecutor);
}
// 資源清理
@PreDestroy
public void destroy() {
ioBoundExecutor.shutdown();
cpuBoundExecutor.shutdown();
try {
if (!ioBoundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
ioBoundExecutor.shutdownNow();
}
if (!cpuBoundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
cpuBoundExecutor.shutdownNow();
}
} catch (InterruptedException e) {
ioBoundExecutor.shutdownNow();
cpuBoundExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}線程池工作流程對比
圖片
三、異常為什么神秘消失了?
有些小伙伴在調試CompletableFuture時,經常會發現異常"神秘消失"了,這其實是CompletableFuture異常處理機制的一個特性。
異常丟失的典型案例
public class ExceptionDisappearance {
public void testExceptionLost() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 這里會拋出異常
return dangerousOperation();
});
// 添加轉換鏈
CompletableFuture<String> resultFuture = future.thenApply(result -> {
System.out.println("處理結果: " + result);
return result + " processed";
});
try {
// 這里不會拋出異常!
String result = resultFuture.get();
System.out.println("最終結果: " + result);
} catch (Exception e) {
// 異常被包裝在ExecutionException中
System.out.println("捕獲到異常: " + e.getClass().getName());
System.out.println("根本原因: " + e.getCause().getMessage());
}
}
private String dangerousOperation() {
thrownew RuntimeException("業務操作失敗!");
}
// 更隱蔽的異常丟失
public void testHiddenExceptionLoss() {
CompletableFuture.supplyAsync(() -> {
thrownew BusinessException("重要異常");
}).thenAccept(result -> {
// 如果上游有異常,這里不會執行
System.out.println("處理結果: " + result);
});
// 程序繼續執行,異常被忽略!
System.out.println("程序正常結束,但異常丟失了!");
}
staticclass BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
}
}CompletableFuture異常處理機制
圖片
正確的異常處理方式
public class ProperExceptionHandling {
// 方法1:使用exceptionally進行恢復
public CompletableFuture<String> handleWithRecovery() {
return CompletableFuture.supplyAsync(() -> {
return riskyOperation();
}).exceptionally(throwable -> {
// 異常恢復
System.err.println("操作失敗,使用默認值: " + throwable.getMessage());
return"default-value";
});
}
// 方法2:使用handle統一處理
public CompletableFuture<String> handleWithUnified() {
return CompletableFuture.supplyAsync(() -> {
return riskyOperation();
}).handle((result, throwable) -> {
if (throwable != null) {
// 處理異常
System.err.println("操作異常: " + throwable.getMessage());
return"error-value";
}
return result + "-processed";
});
}
// 方法3:使用whenComplete進行副作用處理
public CompletableFuture<Void> handleWithSideEffect() {
return CompletableFuture.supplyAsync(() -> {
return riskyOperation();
}).whenComplete((result, throwable) -> {
if (throwable != null) {
// 記錄日志、發送告警等
logError(throwable);
sendAlert(throwable);
} else {
// 正常業務處理
processResult(result);
}
});
}
// 方法4:組合操作中的異常處理
public CompletableFuture<String> handleInComposition() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return operation1();
});
CompletableFuture<String> future2 = future1.thenCompose(result1 -> {
return CompletableFuture.supplyAsync(() -> {
return operation2(result1);
});
});
// 在整個鏈的末尾處理異常
return future2.exceptionally(throwable -> {
Throwable rootCause = getRootCause(throwable);
if (rootCause instanceof BusinessException) {
return"business-fallback";
} elseif (rootCause instanceof TimeoutException) {
return"timeout-fallback";
} else {
return"unknown-error";
}
});
}
private void logError(Throwable throwable) {
// 記錄錯誤日志
System.err.println("錯誤記錄: " + throwable.getMessage());
}
private void sendAlert(Throwable throwable) {
// 發送告警
System.out.println("發送告警: " + throwable.getMessage());
}
private Throwable getRootCause(Throwable throwable) {
Throwable cause = throwable;
while (cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}
}四、回調地獄:當異步變成"異痛"
有些小伙伴在復雜業務場景中使用CompletableFuture時,很容易陷入回調地獄,代碼變得難以理解和維護。
回調地獄的典型案例
public class CallbackHell {
public CompletableFuture<String> processUserOrder(String userId) {
return getUserInfo(userId)
.thenCompose(userInfo -> {
return getOrderHistory(userInfo.getId())
.thenCompose(orderHistory -> {
return calculateDiscount(userInfo, orderHistory)
.thenCompose(discount -> {
return createOrder(userInfo, discount)
.thenCompose(order -> {
return sendConfirmation(userInfo, order);
});
});
});
});
}
// 上述代碼的"平鋪"版本,同樣難以閱讀
public CompletableFuture<String> processUserOrderFlat(String userId) {
return getUserInfo(userId)
.thenCompose(userInfo -> getOrderHistory(userInfo.getId()))
.thenCompose(orderHistory -> getUserInfo(userId))
.thenCompose(userInfo -> calculateDiscount(userInfo, orderHistory))
.thenCompose(discount -> getUserInfo(userId))
.thenCompose(userInfo -> createOrder(userInfo, discount))
.thenCompose(order -> getUserInfo(userId))
.thenCompose(userInfo -> sendConfirmation(userInfo, order));
}
}結構化異步編程解決方案
public class StructuredAsyncProgramming {
// 定義業務數據類
@Data
@AllArgsConstructor
publicstaticclass OrderContext {
private String userId;
private UserInfo userInfo;
private List<Order> orderHistory;
private Discount discount;
private Order order;
private String result;
}
public CompletableFuture<String> processUserOrderStructured(String userId) {
OrderContext context = new OrderContext(userId, null, null, null, null, null);
return getUserInfo(context.getUserId())
.thenCompose(userInfo -> {
context.setUserInfo(userInfo);
return getOrderHistory(userInfo.getId());
})
.thenCompose(orderHistory -> {
context.setOrderHistory(orderHistory);
return calculateDiscount(context.getUserInfo(), orderHistory);
})
.thenCompose(discount -> {
context.setDiscount(discount);
return createOrder(context.getUserInfo(), discount);
})
.thenCompose(order -> {
context.setOrder(order);
return sendConfirmation(context.getUserInfo(), order);
})
.thenApply(result -> {
context.setResult(result);
return result;
})
.exceptionally(throwable -> {
// 統一異常處理
return handleOrderError(context, throwable);
});
}
// 使用thenCombine處理并行任務
public CompletableFuture<UserProfile> getUserProfile(String userId) {
CompletableFuture<UserInfo> userInfoFuture = getUserInfo(userId);
CompletableFuture<List<Order>> orderHistoryFuture = getOrderHistory(userId);
CompletableFuture<List<Address>> addressesFuture = getUserAddresses(userId);
return userInfoFuture.thenCombine(orderHistoryFuture, (userInfo, orders) -> {
returnnew UserProfile(userInfo, orders, null);
}).thenCombine(addressesFuture, (profile, addresses) -> {
profile.setAddresses(addresses);
return profile;
});
}
// 使用allOf處理多個獨立任務
public CompletableFuture<Map<String, Object>> getDashboardData(String userId) {
CompletableFuture<UserInfo> userInfoFuture = getUserInfo(userId);
CompletableFuture<List<Order>> ordersFuture = getOrderHistory(userId);
CompletableFuture<List<Notification>> notificationsFuture = getNotifications(userId);
CompletableFuture<Preferences> preferencesFuture = getPreferences(userId);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
userInfoFuture, ordersFuture, notificationsFuture, preferencesFuture
);
return allFutures.thenApply(v -> {
Map<String, Object> dashboard = new HashMap<>();
try {
dashboard.put("userInfo", userInfoFuture.get());
dashboard.put("orders", ordersFuture.get());
dashboard.put("notifications", notificationsFuture.get());
dashboard.put("preferences", preferencesFuture.get());
} catch (Exception e) {
thrownew CompletionException(e);
}
return dashboard;
});
}
}異步編程模式對比
圖片
更推薦的方案:
圖片
五、內存泄漏:隱藏的資源消耗者
有些小伙伴可能沒有意識到,不當使用CompletableFuture會導致內存泄漏,特別是在長時間運行的應用中。
內存泄漏的常見場景
public class MemoryLeakDemo {
privatefinal Map<String, CompletableFuture<String>> cache = new ConcurrentHashMap<>();
// 場景1:無限增長的緩存
public CompletableFuture<String> getDataWithLeak(String key) {
return cache.computeIfAbsent(key, k -> {
return CompletableFuture.supplyAsync(() -> fetchData(k));
});
}
// 場景2:未完成的Future積累
public void processWithUnfinishedFutures() {
for (int i = 0; i < 100000; i++) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模擬長時間運行或阻塞的任務
try {
Thread.sleep(Long.MAX_VALUE); // 幾乎永久阻塞
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return"result";
});
// future永遠不會完成,但一直存在于內存中
}
}
// 場景3:循環引用
publicclass TaskManager {
private CompletableFuture<String> currentTask;
private String status = "INIT";
public void startTask() {
currentTask = CompletableFuture.supplyAsync(() -> {
// 任務持有Manager的引用
while (!"COMPLETED".equals(status)) {
// 處理任務
processTask();
}
return"done";
});
}
// Manager也持有任務的引用
public CompletableFuture<String> getCurrentTask() {
return currentTask;
}
}
}內存泄漏檢測和預防
public class MemoryLeakPrevention {
privatefinal Cache<String, CompletableFuture<String>> cache;
public MemoryLeakPrevention() {
// 使用Guava Cache自動清理
this.cache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterAccess(10, TimeUnit.MINUTES)
.removalListener((RemovalListener<String, CompletableFuture<String>>) notification -> {
if (notification.getCause() == RemovalCause.SIZE ||
notification.getCause() == RemovalCause.EXPIRED) {
// 取消未完成的任務
CompletableFuture<String> future = notification.getValue();
if (!future.isDone()) {
future.cancel(true);
}
}
})
.build();
}
// 安全的緩存用法
public CompletableFuture<String> getDataSafely(String key) {
try {
return cache.get(key, () -> {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> fetchData(key));
// 添加超時控制
return future.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(throwable -> {
// 發生異常時從緩存中移除
cache.invalidate(key);
return"fallback-data";
});
});
} catch (ExecutionException e) {
thrownew RuntimeException(e);
}
}
// 使用WeakReference避免循環引用
publicstaticclass SafeTaskManager {
private WeakReference<CompletableFuture<String>> currentTaskRef;
public void startTask() {
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
return performTask();
});
currentTaskRef = new WeakReference<>(task);
// 任務完成后自動清理
task.whenComplete((result, error) -> {
currentTaskRef = null;
});
}
}
// 監控和診斷工具
public void monitorFutures() {
// 定期檢查未完成的Future
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
int unfinishedCount = 0;
for (CompletableFuture<?> future : cache.asMap().values()) {
if (!future.isDone()) {
unfinishedCount++;
// 記錄長時間運行的任務
if (future.isDoneExceptionally()) {
// 處理異常任務
handleExceptionalFuture(future);
}
}
}
if (unfinishedCount > 100) {
// 發出警告
System.err.println("警告: 有 " + unfinishedCount + " 個未完成的任務");
}
}
}, 0, 60000); // 每分鐘檢查一次
}
private void handleExceptionalFuture(CompletableFuture<?> future) {
// 處理異常Future,避免它們一直存在
future.exceptionally(throwable -> {
// 記錄異常日志
System.err.println("任務異常: " + throwable.getMessage());
returnnull;
});
}
}內存泄漏檢測流程
圖片
六、超時控制缺失
有些小伙伴在使用CompletableFuture時,經常會忘記設置超時控制,這可能導致線程永遠阻塞。
超時問題的嚴重性
public class TimeoutPitfalls {
// 危險的代碼:沒有超時控制
public String dangerousGet() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模擬網絡問題導致的無限阻塞
return blockingNetworkCall();
});
try {
// 如果任務永遠不完成,這里會永遠阻塞
return future.get();
} catch (Exception e) {
return"error";
}
}
// 資源泄漏的示例
public void resourceLeakExample() {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
CompletableFuture.runAsync(() -> {
try {
// 長時間運行的任務
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor);
}
// 線程池中的線程都被占用,無法執行新任務
}
private String blockingNetworkCall() {
// 模擬網絡問題
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return"response";
}
}完整的超時控制方案
public class CompleteTimeoutSolution {
privatefinal ScheduledExecutorService timeoutExecutor;
public CompleteTimeoutSolution() {
this.timeoutExecutor = Executors.newScheduledThreadPool(2);
}
// 方法1:使用orTimeout(Java 9+)
public CompletableFuture<String> withOrTimeout() {
return CompletableFuture.supplyAsync(() -> {
return externalServiceCall();
}).orTimeout(5, TimeUnit.SECONDS) // 5秒超時
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
return"timeout-fallback";
}
return"error-fallback";
});
}
// 方法2:使用completeOnTimeout(Java 9+)
public CompletableFuture<String> withCompleteOnTimeout() {
return CompletableFuture.supplyAsync(() -> {
return externalServiceCall();
}).completeOnTimeout("timeout-default", 3, TimeUnit.SECONDS);
}
// 方法3:手動超時控制(Java 8兼容)
public CompletableFuture<String> withManualTimeout() {
CompletableFuture<String> taskFuture = CompletableFuture.supplyAsync(() -> {
return externalServiceCall();
});
CompletableFuture<String> timeoutFuture = new CompletableFuture<>();
// 設置超時
timeoutExecutor.schedule(() -> {
timeoutFuture.completeExceptionally(new TimeoutException("操作超時"));
}, 5, TimeUnit.SECONDS);
// 哪個先完成就返回哪個
return taskFuture.applyToEither(timeoutFuture, Function.identity())
.exceptionally(throwable -> {
if (throwable instanceof TimeoutException) {
return"manual-timeout-fallback";
}
return"other-error-fallback";
});
}
// 方法4:分層超時控制
public CompletableFuture<String> withLayeredTimeout() {
return CompletableFuture.supplyAsync(() -> {
return phase1Operation();
}).orTimeout(2, TimeUnit.SECONDS)
.thenCompose(phase1Result -> {
return CompletableFuture.supplyAsync(() -> {
return phase2Operation(phase1Result);
}).orTimeout(3, TimeUnit.SECONDS);
})
.thenCompose(phase2Result -> {
return CompletableFuture.supplyAsync(() -> {
return phase3Operation(phase2Result);
}).orTimeout(5, TimeUnit.SECONDS);
})
.exceptionally(throwable -> {
Throwable rootCause = getRootCause(throwable);
if (rootCause instanceof TimeoutException) {
// 根據超時階段提供不同的降級策略
return"timeout-in-phase";
}
return"general-fallback";
});
}
// 方法5:可配置的超時策略
public CompletableFuture<String> withConfigurableTimeout(String operationType) {
TimeoutConfig config = getTimeoutConfig(operationType);
return CompletableFuture.supplyAsync(() -> {
return performOperation(operationType);
}).orTimeout(config.getTimeout(), config.getTimeUnit())
.exceptionally(throwable -> {
return config.getFallbackStrategy().apply(throwable);
});
}
@PreDestroy
public void destroy() {
timeoutExecutor.shutdown();
try {
if (!timeoutExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
timeoutExecutor.shutdownNow();
}
} catch (InterruptedException e) {
timeoutExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 超時配置類
@Data
publicstaticclass TimeoutConfig {
privatefinallong timeout;
privatefinal TimeUnit timeUnit;
privatefinal Function<Throwable, String> fallbackStrategy;
}
private TimeoutConfig getTimeoutConfig(String operationType) {
switch (operationType) {
case"fast":
returnnew TimeoutConfig(1, TimeUnit.SECONDS,
t -> "fast-timeout");
case"normal":
returnnew TimeoutConfig(5, TimeUnit.SECONDS,
t -> "normal-timeout");
case"slow":
returnnew TimeoutConfig(30, TimeUnit.SECONDS,
t -> "slow-timeout");
default:
returnnew TimeoutConfig(10, TimeUnit.SECONDS,
t -> "default-timeout");
}
}
}超時控制策略
圖片
總結
通過上面的詳細分析,我們可以看到CompletableFuture雖然強大,但也確實存在不少陷阱。
最后的建議
- 理解原理:不要只是機械地使用API,要理解CompletableFuture的工作原理
- 適度使用:不是所有場景都需要異步,同步代碼更簡單易懂
- 測試覆蓋:異步代碼的測試很重要,要覆蓋各種邊界情況
- 監控告警:在生產環境中要有完善的監控和告警機制
- 持續學習:關注Java并發編程的新特性和最佳實踐
記住,工具是為了提高生產力,而不是制造問題。
掌握了這些避坑技巧,CompletableFuture將成為你手中強大的并發編程利器!
























