SpringBoot與Temporal整合,實現訂單處理工作流功能
作者:Java知識日歷
通過 Temporal,我們可以將一些步驟定義為一個復雜的工作流,并利用其強大的特性來管理整個流程,確保每一步都可靠地執行,并且在整個過程中提供靈活的控制和監控能力。
通過 Temporal,我們可以將一些步驟定義為一個復雜的工作流,并利用其強大的特性來管理整個流程,確保每一步都可靠地執行,并且在整個過程中提供靈活的控制和監控能力。
哪些公司使用了Temporal?
- Netflix 使用 Temporal 來管理其復雜的微服務架構中的工作流,特別是在視頻內容處理和推薦系統中。
- Airbnb 利用 Temporal 來管理內部的各種自動化任務,包括數據同步、報告生成和用戶通知等。
- DoorDash 使用 Temporal 來管理配送員的任務分配、訂單跟蹤和物流協調。
- Uber 在多個業務領域使用 Temporal,包括司機調度、訂單處理和客戶服務支持。
- Coinbase 使用 Temporal 來管理其金融交易流程,包括訂單執行、結算和合規檢查。
- Lyft 使用 Temporal 來管理車輛調度、乘客匹配和行程跟蹤。
- Salesforce 使用 Temporal 來管理其內部的自動化任務和客戶工作流。
- Etsy 使用 Temporal 來管理其內部的自動化任務和供應鏈管理流程。
我們為什么選擇Temporal來實現訂單處理工作流?
- 可靠性和一致性: Temporal 提供了強大的分布式事務管理能力,確保每個步驟(如支付、發貨、退款)都能可靠地執行,并且整個流程的一致性得到保證。
- 失敗重試: Temporal 內置了自動重試機制,當某個步驟失敗時,可以自動重試,直到成功或達到最大重試次數。
- 水平擴展: Temporal 可以輕松地水平擴展,適應高并發的訂單處理需求。
- 高效調度: Temporal 使用高效的調度算法來管理任務隊列,確保各個步驟能夠快速而有序地執行。
- 靈活的工作流定義: Temporal 支持復雜的業務邏輯定義,可以通過編程方式構建多分支、循環等工作流。
- 信號和查詢: Temporal 提供了信號和查詢功能,允許在工作流運行過程中動態地發送信號或獲取狀態信息,從而實現靈活的狀態管理和控制。
- 詳細的日志記錄: Temporal 提供了詳細的日志記錄功能,方便開發者監控和調試工作流的執行情況。
- 可視化工具: Temporal 提供了可視化的 Web 界面,幫助開發者直觀地查看工作流的狀態和歷史記錄。
- 開源免費: 對于小型項目或非關鍵業務場景,可以選擇使用開源版本的 Temporal,無需額外費用。
啟動Temporal服務
docker run --name temporal -d -p 7233:7233 temporalio/server:latest代碼實操
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.19.0</version>
</dependency>
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-spring-boot-starter</artifactId>
<version>1.19.0</version>
</dependency>OrderWorkflowApplication
package com.example.orderworkflow;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class OrderWorkflowApplication {
public static void main(String[] args) {
// 啟動Spring Boot應用程序
SpringApplication.run(OrderWorkflowApplication.class, args);
}
@Bean
public WorkflowClient workflowClient() {
// 創建Temporal服務存根
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
// 返回一個新的Workflow客戶端實例
return WorkflowClient.newInstance(service);
}
}activities/OrderActivities
package com.example.orderworkflow.activities;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
@ActivityInterface
public interface OrderActivities {
// 定義處理支付的活動方法
@ActivityMethod
String processPayment(String orderId);
// 定義發貨的活動方法
@ActivityMethod
String shipOrder(String orderId);
// 定義退款的活動方法
@ActivityMethod
String refundOrder(String orderId);
}activities/OrderActivitiesImpl
實現了這三個活動方法的具體邏輯,模擬了支付、發貨和退款的過程。
package com.example.orderworkflow.activities;
import io.temporal.activity.Activity;
import org.springframework.stereotype.Service;
@Service
public class OrderActivitiesImpl implements OrderActivities {
@Override
public String processPayment(String orderId) {
// 模擬處理支付過程
Activity.getExecutionContext().heartbeat("Processing payment for order: " + orderId);
System.out.println("Processing payment for order: " + orderId);
return"PAYMENT_SUCCESS";
}
@Override
public String shipOrder(String orderId) {
// 模擬發貨過程
Activity.getExecutionContext().heartbeat("Shipping order: " + orderId);
System.out.println("Shipping order: " + orderId);
return"SHIP_SUCCESS";
}
@Override
public String refundOrder(String orderId) {
// 模擬退款過程
Activity.getExecutionContext().heartbeat("Refunding order: " + orderId);
System.out.println("Refunding order: " + orderId);
return"REFUND_SUCCESS";
}
}workflows/OrderWorkflow
定義了一個工作流方法 startOrderProcess 和兩個信號方法 retryStep 和 rollbackStep。
package com.example.orderworkflow.workflows;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
@WorkflowInterface
public interface OrderWorkflow {
// 定義啟動訂單處理的工作流方法
@WorkflowMethod
void startOrderProcess(String orderId);
// 定義重試步驟的信號方法
@SignalMethod
void retryStep();
// 定義回滾步驟的信號方法
@SignalMethod
void rollbackStep();
}workflows/OrderWorkflowImpl
實現了工作流邏輯,包括按順序執行支付、發貨步驟,并支持重試和回滾操作。
package com.example.orderworkflow.workflows;
import com.example.orderworkflow.activities.OrderActivities;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.Duration;
import io.temporal.workflow.Workflow;
public class OrderWorkflowImpl implements OrderWorkflow {
// 創建活動存根,并設置活動選項(例如超時時間)
private final OrderActivities activities = Workflow.newActivityStub(
OrderActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(5))
.build());
// 記錄當前步驟,默認為“PAYMENT”
private String currentStep = "PAYMENT";
@Override
public void startOrderProcess(String orderId) {
try {
// 如果當前步驟是“PAYMENT”,則執行支付活動
if (currentStep.equals("PAYMENT")) {
activities.processPayment(orderId);
currentStep = "SHIP"; // 支付成功后,移動到下一個步驟“SHIP”
}
// 如果當前步驟是“SHIP”,則執行發貨活動
if (currentStep.equals("SHIP")) {
activities.shipOrder(orderId);
currentStep = "COMPLETED"; // 發貨成功后,標記訂單完成
}
} catch (Exception e) {
// 捕獲并打印異常信息
System.err.println("Error in step: " + currentStep + ", Error message: " + e.getMessage());
throw new RuntimeException(e); // 拋出運行時異常
}
}
@Override
public void retryStep() {
// 打印正在重試的步驟
System.out.println("Retrying step: " + currentStep);
// 重新啟動訂單處理流程
startOrderProcess(Workflow.getInfo().getWorkflowId());
}
@Override
public void rollbackStep() {
// 如果當前步驟是“SHIP”,則執行退款活動并將步驟回滾到“PAYMENT”
if (currentStep.equals("SHIP")) {
activities.refundOrder(Workflow.getInfo().getWorkflowId());
currentStep = "PAYMENT";
}
// 打印回滾后的步驟
System.out.println("Rolled back to step: " + currentStep);
}
}啟動工作流實例
package com.example.orderworkflow;
import com.example.orderworkflow.workflows.OrderWorkflow;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class WorkflowStarter implements CommandLineRunner {
@Autowired
private WorkflowClient workflowClient;
@Override
public void run(String... args) throws Exception {
// 定義工作流選項
WorkflowOptions options = WorkflowOptions.newBuilder()
.setWorkflowId("order-workflow-id")
.setTaskQueue("order-task-queue")
.build();
// 獲取工作流存根
OrderWorkflow workflow = workflowClient.newWorkflowStub(OrderWorkflow.class, options);
// 啟動工作流實例
WorkflowClient.start(workflow::startOrderProcess, "orderId123");
System.out.println("Started workflow with ID: " + workflow.getExecution().getWorkflowId());
}
}責任編輯:武曉燕
來源:
Java知識日歷





























