FlinkSQL 電商訂單狀態追蹤與實時處理代碼
一、系統架構概述
在大型電商平臺中,訂單狀態的實時追蹤與處理是保障交易流暢性的核心環節。本系統基于FlinkSQL構建,實現從訂單創建到完成的全生命周期狀態管理,并實時觸發庫存更新、物流調度等關鍵業務操作。系統采用分層架構設計,包含數據接入層、處理層和輸出層,各層之間通過事件流緊密銜接,確保狀態變更的毫秒級響應。

二、環境準備與基礎配置
1. Flink環境配置
-- 設置Flink執行參數
SET execution.checkpointing.interval=30000;-- 30秒一次檢查點
SET execution.checkpointing.timeout =60000;-- 檢查點超時時間
SET execution.checkpointing.mode= EXACTLY_ONCE;-- 精確一次語義
SET state.backend ='rocksdb';-- 使用RocksDB作為狀態后端
SET state.ttl.ttl =86400000;-- 狀態保留1天
SET parallelism.default=12;-- 默認并行度122. 基礎數據類型定義
-- 定義訂單狀態枚舉類型
CREATETYPE OrderStatus ASENUM(
'PENDING_PAYMENT',-- 待付款
'PAID',-- 已付款
'PROCESSING',-- 處理中
'SHIPPED',-- 已發貨
'DELIVERED',-- 已送達
'CANCELLED',-- 已取消
'REFUNDED',-- 已退款
'EXPIRED'-- 已過期
);
-- 定義庫存操作類型枚舉
CREATETYPE InventoryOpType ASENUM(
'INCREASE',-- 增加庫存
'DECREASE',-- 減少庫存
'FREEZE',-- 凍結庫存
'UNFREEZE',-- 解凍庫存
'ADJUST'-- 調整庫存
);
-- 定義物流調度狀態枚舉
CREATETYPE LogisticsStatus ASENUM(
'PENDING_DISPATCH',-- 待調度
'DISPATCHED',-- 已調度
'IN_TRANSIT',-- 運輸中
'OUT_FOR_DELIVERY',-- 配送中
'DELIVERED',-- 已送達
'FAILED'-- 配送失敗
);三、數據接入層設計
1. 訂單事件流接入 (Kafka)
-- 訂單狀態變更事件流
CREATETABLE order_status_events (
order_id STRING,-- 訂單ID
user_id STRING,-- 用戶ID
status OrderStatus,-- 訂單狀態
prev_status OrderStatus,-- 上一狀態
status_time TIMESTAMP(3),-- 狀態變更時間
payment_time TIMESTAMP(3),-- 支付時間(如有)
cancel_reason STRING,-- 取消原因(如有)
operation_user STRING,-- 操作人
ext_info MAP<STRING, STRING>,-- 擴展信息
event_time AS PROCTIME(),-- 處理時間
WATERMARK FOR status_time AS status_time -INTERVAL'5'SECOND-- 水印定義,允許5秒延遲
)WITH(
'connector'='kafka',
'topic'='order_status_events',
'properties.bootstrap.servers'='kafka-broker:9092',
'properties.group.id'='flink_order_tracking_group',
'format'='json',
'json.fail-on-missing-field'='false',
'json.ignore-parse-errors'='true',
'scan.startup.mode'='earliest-offset',
'properties.auto.offset.reset'='earliest'
);
-- 訂單商品明細流
CREATETABLE order_item_events (
order_id STRING,-- 訂單ID
item_id STRING,-- 商品ID
sku_id STRING,-- SKU ID
product_name STRING,-- 商品名稱
quantity INT,-- 數量
price DECIMAL(10,2),-- 單價
discount DECIMAL(10,2),-- 折扣金額
create_time TIMESTAMP(3),-- 創建時間
WATERMARK FOR create_time AS create_time -INTERVAL'5'SECOND
)WITH(
'connector'='kafka',
'topic'='order_item_events',
'properties.bootstrap.servers'='kafka-broker:9092',
'properties.group.id'='flink_order_item_group',
'format'='json'
);2. 庫存數據接入 (MySQL + CDC)
-- 商品庫存基礎表 (CDC方式接入)
CREATETABLE product_inventory (
product_id STRING,-- 商品ID
sku_id STRING,-- SKU ID
total_stock INT,-- 總庫存
available_stock INT,-- 可用庫存
frozen_stock INT,-- 凍結庫存
locked_stock INT,-- 鎖定庫存
update_time TIMESTAMP(3),-- 更新時間
PRIMARYKEY(product_id, sku_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='mysql-inventory',
'port'='3306',
'username'='flink_user',
'password'='flink_password',
'database-name'='inventory_db',
'table-name'='product_inventory',
'server-time-zone'='Asia/Shanghai'
);3. 物流信息接入 (Kafka + HBase)
-- 物流單事件流
CREATETABLE logistics_events (
logistics_id STRING,-- 物流單ID
order_id STRING,-- 訂單ID
status LogisticsStatus,-- 物流狀態
status_time TIMESTAMP(3),-- 狀態時間
location STRING,-- 當前位置
courier_id STRING,-- 快遞員ID
courier_name STRING,-- 快遞員姓名
WATERMARK FOR status_time AS status_time -INTERVAL'10'SECOND
)WITH(
'connector'='kafka',
'topic'='logistics_events',
'properties.bootstrap.servers'='kafka-broker:9092',
'properties.group.id'='flink_logistics_group',
'format'='json'
);
-- 物流區域信息表 (HBase)
CREATETABLE logistics_area_info (
area_id STRING,-- 區域ID
province STRING,-- 省份
city STRING,-- 城市
district STRING,-- 區/縣
warehouse_id STRING,-- 對應倉庫ID
PRIMARYKEY(area_id)NOT ENFORCED
)WITH(
'connector'='hbase-2.2',
'table-name'='logistics:area_info',
'zookeeper.quorum'='zk-node1,zk-node2,zk-node3',
'zookeeper.znode.parent'='/hbase'
);四、數據處理層設計
1. 訂單狀態流轉核心邏輯
(1) 訂單狀態清洗與規范化
-- 創建訂單狀態事件清洗視圖
CREATEVIEW cleaned_order_status_events AS
SELECT
order_id,
user_id,
status,
prev_status,
status_time,
payment_time,
-- 標準化取消原因
CASE
WHENstatus='CANCELLED'THEN
CASE
WHEN cancel_reason ISNULLOR cancel_reason =''THEN'UNKNOWN'
WHEN cancel_reason IN('user_cancel','用戶取消')THEN'USER_CANCEL'
WHEN cancel_reason IN('stock_out','庫存不足')THEN'STOCK_OUT'
WHEN cancel_reason IN('payment_timeout','支付超時')THEN'PAYMENT_TIMEOUT'
ELSE'OTHER'
END
ELSENULL
ENDAS cancel_reason_standardized,
operation_user,
ext_info,
-- 添加訂單創建時間(首次狀態變更)
FIRST_VALUE(status_time)OVER(PARTITIONBY order_id ORDERBY status_time)AS create_time,
-- 計算狀態持續時間(與上一狀態比較)
status_time - LAG(status_time)OVER(PARTITIONBY order_id ORDERBY status_time)AS status_duration,
event_time
FROM order_status_events;(2) 訂單狀態生命周期追蹤
-- 訂單狀態生命周期表 (使用狀態函數追蹤完整生命周期)
CREATETABLE order_lifecycle (
order_id STRING,
user_id STRING,
create_time TIMESTAMP(3),
pending_payment_time TIMESTAMP(3),
paid_time TIMESTAMP(3),
processing_time TIMESTAMP(3),
shipped_time TIMESTAMP(3),
delivered_time TIMESTAMP(3),
cancelled_time TIMESTAMP(3),
refunded_time TIMESTAMP(3),
expired_time TIMESTAMP(3),
cancel_reason STRING,
current_status OrderStatus,
status_updated_time TIMESTAMP(3),
-- 各狀態持續時間
pending_payment_duration BIGINT,
processing_duration BIGINT,
shipping_duration BIGINT,
delivery_duration BIGINT,
overall_duration BIGINT,
last_updated AS PROCTIME()
)WITH(
'connector'='upsert-kafka',
'topic'='order_lifecycle',
'properties.bootstrap.servers'='kafka-broker:9092',
'key.format'='json',
'key.json.ignore-parse-errors'='true',
'value.format'='json',
'value.json.fail-on-missing-field'='false'
);
-- 寫入訂單生命周期表
INSERTINTO order_lifecycle
SELECT
order_id,
user_id,
MAX(create_time)AS create_time,
MAX(CASEWHENstatus='PENDING_PAYMENT'THEN status_time END)AS pending_payment_time,
MAX(CASEWHENstatus='PAID'THEN status_time END)AS paid_time,
MAX(CASEWHENstatus='PROCESSING'THEN status_time END)AS processing_time,
MAX(CASEWHENstatus='SHIPPED'THEN status_time END)AS shipped_time,
MAX(CASEWHENstatus='DELIVERED'THEN status_time END)AS delivered_time,
MAX(CASEWHENstatus='CANCELLED'THEN status_time END)AS cancelled_time,
MAX(CASEWHENstatus='REFUNDED'THEN status_time END)AS refunded_time,
MAX(CASEWHENstatus='EXPIRED'THEN status_time END)AS expired_time,
MAX(CASEWHENstatus='CANCELLED'THEN cancel_reason_standardized END)AS cancel_reason,
LAST_VALUE(status)OVER(PARTITIONBY order_id ORDERBY status_time ROWSBETWEENUNBOUNDEDPRECEDINGANDUNBOUNDEDFOLLOWING)AS current_status,
MAX(status_time)AS status_updated_time,
-- 計算各狀態持續時間(秒)
TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='PENDING_PAYMENT'THEN status_time END),
MAX(CASEWHENstatus='PAID'THEN status_time END))AS pending_payment_duration,
TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='PAID'THEN status_time END),
MAX(CASEWHENstatus='SHIPPED'THEN status_time END))AS processing_duration,
TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='SHIPPED'THEN status_time END),
MAX(CASEWHENstatus='OUT_FOR_DELIVERY'THEN status_time END))AS shipping_duration,
TIMESTAMPDIFF(SECOND,MAX(CASEWHENstatus='OUT_FOR_DELIVERY'THEN status_time END),
MAX(CASEWHENstatus='DELIVERED'THEN status_time END))AS delivery_duration,
TIMESTAMPDIFF(SECOND,MAX(create_time),MAX(status_time))AS overall_duration
FROM cleaned_order_status_events
GROUPBY order_id, user_id;2. 庫存更新邏輯處理
(1) 庫存操作事件生成
-- 創建庫存操作事件視圖
CREATEVIEW inventory_operation_events AS
WITH order_item_agg AS(
-- 聚合訂單商品信息
SELECT
order_id,
COLLECT_LIST(ROW(sku_id, quantity))AS items,
MAX(create_time)AS create_time
FROM order_item_events
GROUPBY order_id
)
-- 生成庫存操作事件
SELECT
UUID()AS op_id,-- 操作ID
o.order_id,
UNNEST(items).sku_id AS sku_id,
UNNEST(items).quantity AS quantity,
-- 根據訂單狀態確定庫存操作類型
CASE
WHEN o.status='PAID'THEN'FREEZE'-- 支付成功,凍結庫存
WHEN o.status='SHIPPED'THEN'DECREASE'-- 已發貨,減少庫存
WHEN o.status='CANCELLED'AND o.prev_status IN('PAID','PROCESSING')THEN'UNFREEZE'-- 已取消,解凍庫存
WHEN o.status='REFUNDED'THEN'INCREASE'-- 已退款,增加庫存
ELSENULL
ENDAS op_type,
o.status_time AS op_time,
'ORDER_SYSTEM'AS source_system,
o.event_time
FROM cleaned_order_status_events o
JOIN order_item_agg oi ON o.order_id = oi.order_id
-- 過濾出需要庫存操作的狀態變更
WHERE
(o.status='PAID'AND o.prev_status ='PENDING_PAYMENT')OR
(o.status='SHIPPED'AND o.prev_status ='PROCESSING')OR
(o.status='CANCELLED'AND o.prev_status IN('PAID','PROCESSING'))OR
(o.status='REFUNDED');(2) 庫存并發控制與更新
-- 創建庫存更新結果表
CREATETABLE inventory_update_results (
op_id STRING,
order_id STRING,
sku_id STRING,
op_type InventoryOpType,
quantity INT,
prev_available_stock INT,
new_available_stock INT,
prev_frozen_stock INT,
new_frozen_stock INT,
op_time TIMESTAMP(3),
process_time TIMESTAMP(3),
status STRING,-- SUCCESS, FAILED, RETRY
message STRING,
PRIMARYKEY(op_id)NOT ENFORCED
)WITH(
'connector'='jdbc',
'url'='jdbc:mysql://mysql-inventory:3306/inventory_db',
'table-name'='inventory_update_results',
'username'='flink_user',
'password'='flink_password',
'sink.buffer-flush.max-rows'='100',
'sink.buffer-flush.interval'='5s',
'sink.max-retries'='3'
);
-- 庫存更新主邏輯
INSERTINTO inventory_update_results
SELECT
op_id,
order_id,
sku_id,
op_type,
quantity,
prev_available,
new_available,
prev_frozen,
new_frozen,
op_time,
CURRENT_TIMESTAMPAS process_time,
CASE
WHEN(op_type ='DECREASE'AND prev_available < quantity)THEN'FAILED'
WHEN(op_type ='FREEZE'AND prev_available < quantity)THEN'FAILED'
ELSE'SUCCESS'
ENDASstatus,
CASE
WHEN(op_type ='DECREASE'AND prev_available < quantity)THEN'Insufficient stock'
WHEN(op_type ='FREEZE'AND prev_available < quantity)THEN'Insufficient stock to freeze'
ELSE'Operation successful'
ENDAS message
FROM(
-- 使用Flink的狀態函數進行庫存原子更新
SELECT
op_id,
order_id,
sku_id,
op_type,
quantity,
op_time,
-- 根據操作類型計算新庫存值
CASE op_type
WHEN'INCREASE'THEN available_stock + quantity
WHEN'DECREASE'THEN available_stock - quantity
WHEN'FREEZE'THEN available_stock - quantity
WHEN'UNFREEZE'THEN available_stock + quantity
ELSE available_stock
ENDAS new_available,
available_stock AS prev_available,
-- 處理凍結庫存
CASE op_type
WHEN'FREEZE'THEN frozen_stock + quantity
WHEN'UNFREEZE'THEN frozen_stock - quantity
ELSE frozen_stock
ENDAS new_frozen,
frozen_stock AS prev_frozen
FROM inventory_operation_events
-- 關聯當前庫存信息
JOIN product_inventory FOR SYSTEM_TIME ASOF event_time
ON inventory_operation_events.sku_id = product_inventory.sku_id
) t
-- 過濾掉無效操作類型
WHERE op_type ISNOTNULL;3. 物流調度觸發與優化
(1) 物流單創建與調度
-- 創建物流調度指令表
CREATETABLE logistics_dispatch_commands (
command_id STRING,
order_id STRING,
user_id STRING,
sku_id STRING,
quantity INT,
warehouse_id STRING,
target_province STRING,
target_city STRING,
target_district STRING,
target_address STRING,
required_delivery_time TIMESTAMP(3),
priority STRING,-- HIGH, MEDIUM, LOW
create_time TIMESTAMP(3),
status STRING,-- PENDING, DISPATCHED, FAILED
PRIMARYKEY(command_id)NOT ENFORCED
)WITH(
'connector'='kafka',
'topic'='logistics_dispatch_commands',
'properties.bootstrap.servers'='kafka-broker:9092',
'key.format'='json',
'value.format'='json',
'sink.partitioner'='round-robin'
);
-- 物流調度觸發邏輯
INSERTINTO logistics_dispatch_commands
SELECT
UUID()AS command_id,
o.order_id,
o.user_id,
oi.sku_id,
oi.quantity,
la.warehouse_id,
u.province,
u.city,
u.district,
u.address,
-- 計算期望送達時間(根據商品類型)
CASE
WHEN p.category ='fresh'THEN o.status_time +INTERVAL'24'HOUR
WHEN p.category ='digital'THEN o.status_time +INTERVAL'48'HOUR
ELSE o.status_time +INTERVAL'72'HOUR
ENDAS required_delivery_time,
-- 根據訂單金額確定優先級
CASE
WHENSUM(oi.quantity * oi.price)>1000THEN'HIGH'
WHENSUM(oi.quantity * oi.price)>500THEN'MEDIUM'
ELSE'LOW'
ENDOVER(PARTITIONBY o.order_id)AS priority,
o.status_time AS create_time,
'PENDING'ASstatus
FROM cleaned_order_status_events o
-- 關聯訂單商品信息
JOIN order_item_events oi ON o.order_id = oi.order_id
-- 關聯用戶收貨地址
JOIN user_address FOR SYSTEM_TIME ASOF o.event_time
ON o.user_id = user_address.user_id
AND user_address.is_default =TRUE
-- 關聯商品信息獲取分類
JOIN product_info FOR SYSTEM_TIME ASOF o.event_time
ON oi.sku_id = product_info.sku_id
-- 關聯物流區域信息獲取最優倉庫
JOIN logistics_area_info la
ON user_address.district = la.district
-- 僅處理已支付待發貨的訂單
WHERE o.status='PAID'
AND o.prev_status ='PENDING_PAYMENT'
-- 添加冪等性控制,防止重復調度
ANDNOTEXISTS(
SELECT1FROM logistics_dispatch_commands
WHERE order_id = o.order_id ANDstatus!='FAILED'
);(2) 物流效率監控與優化
-- 創建物流時效監控視圖
CREATEVIEW logistics_efficiency_metrics AS
WITH order_logistics AS(
-- 關聯訂單與物流信息
SELECT
o.order_id,
o.status_time AS paid_time,
l.logistics_id,
MIN(CASEWHEN l.status='DISPATCHED'THEN l.status_time END)AS dispatched_time,
MIN(CASEWHEN l.status='IN_TRANSIT'THEN l.status_time END)AS transit_time,
MIN(CASEWHEN l.status='OUT_FOR_DELIVERY'THEN l.status_time END)AS delivery_time,
MIN(CASEWHEN l.status='DELIVERED'THEN l.status_time END)AS received_time,
la.warehouse_id,
la.city AS warehouse_city,
u.city AS target_city
FROM cleaned_order_status_events o
LEFTJOIN logistics_events l ON o.order_id = l.order_id
LEFTJOIN logistics_area_info la ON l.location = la.area_id
LEFTJOIN user_address u ON o.user_id = u.user_id AND u.is_default =TRUE
WHERE o.status='PAID'
GROUPBY o.order_id, o.status_time, l.logistics_id, la.warehouse_id, la.city, u.city
)
-- 計算各環節時效指標
SELECT
order_id,
logistics_id,
warehouse_id,
warehouse_city,
target_city,
paid_time,
dispatched_time,
transit_time,
delivery_time,
received_time,
-- 計算各階段耗時(分鐘)
TIMESTAMPDIFF(MINUTE, paid_time, dispatched_time)AS warehouse_processing_minutes,
TIMESTAMPDIFF(MINUTE, dispatched_time, transit_time)AS first_mile_minutes,
TIMESTAMPDIFF(MINUTE, transit_time, delivery_time)AS line_haul_minutes,
TIMESTAMPDIFF(MINUTE, delivery_time, received_time)AS last_mile_minutes,
TIMESTAMPDIFF(MINUTE, paid_time, received_time)AS total_delivery_minutes,
-- 判斷是否超時
CASE
WHEN TIMESTAMPDIFF(MINUTE, paid_time, received_time)>1440THEN'OVERDUE'-- >24小時
WHEN TIMESTAMPDIFF(MINUTE, paid_time, received_time)>720THEN'AT_RISK'-- >12小時
ELSE'ON_TIME'
ENDAS delivery_status
FROM order_logistics;
-- 創建物流效率監控結果表
CREATETABLE logistics_efficiency_monitor (
order_id STRING,
warehouse_id STRING,
warehouse_city STRING,
target_city STRING,
total_delivery_minutes INT,
delivery_status STRING,
warehouse_processing_minutes INT,
first_mile_minutes INT,
line_haul_minutes INT,
last_mile_minutes INT,
monitoring_time TIMESTAMP(3),
PRIMARYKEY(order_id)NOT ENFORCED
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://es-node1:9200,http://es-node2:9200',
'index'='logistics_efficiency_{yyyyMMdd}',
'document-id.key-delimiter'='$',
'sink.bulk-flush.max-actions'='1000',
'sink.bulk-flush.max-size'='2mb',
'sink.bulk-flush.interval'='10s',
'format'='json'
);
-- 寫入物流效率監控數據
INSERTINTO logistics_efficiency_monitor
SELECT
order_id,
warehouse_id,
warehouse_city,
target_city,
total_delivery_minutes,
delivery_status,
warehouse_processing_minutes,
first_mile_minutes,
line_haul_minutes,
last_mile_minutes,
CURRENT_TIMESTAMPAS monitoring_time
FROM logistics_efficiency_metrics
WHERE received_time ISNOTNULL;-- 僅處理已收貨訂單4. 異常訂單檢測與處理
-- 創建訂單異常檢測視圖
CREATEVIEW abnormal_order_detection AS
SELECT
order_id,
user_id,
current_status,
status_time,
create_time,
-- 計算訂單各階段超時情況
CASE
WHEN current_status ='PENDING_PAYMENT'
AND TIMESTAMPDIFF(MINUTE, create_time,CURRENT_TIMESTAMP)>30THEN'PAYMENT_TIMEOUT'
WHEN current_status ='PROCESSING'
AND TIMESTAMPDIFF(HOUR, paid_time,CURRENT_TIMESTAMP)>24THEN'PROCESSING_TIMEOUT'
WHEN current_status ='SHIPPED'
AND TIMESTAMPDIFF(HOUR, shipped_time,CURRENT_TIMESTAMP)>72THEN'DELIVERY_TIMEOUT'
ELSENULL
ENDAS abnormal_type,
-- 計算超時時間(分鐘)
CASE
WHEN current_status ='PENDING_PAYMENT'
THEN TIMESTAMPDIFF(MINUTE, create_time,CURRENT_TIMESTAMP)-30
WHEN current_status ='PROCESSING'
THEN TIMESTAMPDIFF(MINUTE, paid_time,CURRENT_TIMESTAMP)-1440
WHEN current_status ='SHIPPED'
THEN TIMESTAMPDIFF(MINUTE, shipped_time,CURRENT_TIMESTAMP)-4320
ELSE0
ENDAS overtime_minutes,
-- 獲取用戶歷史異常訂單數
(SELECTCOUNT(*)FROM order_lifecycle
WHERE user_id = o.user_id AND abnormal_type ISNOTNULL)AS user_abnormal_count,
CURRENT_TIMESTAMPAS detection_time
FROM order_lifecycle o
-- 檢測異常條件
WHERE(
(current_status ='PENDING_PAYMENT'
AND TIMESTAMPDIFF(MINUTE, create_time,CURRENT_TIMESTAMP)>30)
OR
(current_status ='PROCESSING'
AND TIMESTAMPDIFF(HOUR, paid_time,CURRENT_TIMESTAMP)>24)
OR
(current_status ='SHIPPED'
AND TIMESTAMPDIFF(HOUR, shipped_time,CURRENT_TIMESTAMP)>72)
)
-- 排除已處理的異常訂單
AND order_id NOTIN(SELECT order_id FROM abnormal_order_handling);
-- 創建異常訂單處理指令表
CREATETABLE abnormal_order_handling (
order_id STRING,
abnormal_type STRING,
overtime_minutes INT,
user_abnormal_count INT,
detection_time TIMESTAMP(3),
handler STRING,
handling_action STRING,
handling_time TIMESTAMP(3),
status STRING,-- PENDING, PROCESSED, RESOLVED
notes STRING,
PRIMARYKEY(order_id)NOT ENFORCED
)WITH(
'connector'='jdbc',
'url'='jdbc:mysql://mysql-order:3306/order_db',
'table-name'='abnormal_order_handling',
'username'='flink_user',
'password'='flink_password'
);
-- 自動生成異常訂單處理指令
INSERTINTO abnormal_order_handling
SELECT
order_id,
abnormal_type,
overtime_minutes,
user_abnormal_count,
detection_time,
-- 根據異常類型和用戶歷史異常數分配處理人員
CASE
WHEN user_abnormal_count >5THEN'vip_customer_service'
WHEN abnormal_type ='DELIVERY_TIMEOUT'THEN'logistics_support'
ELSE'order_support'
ENDAShandler,
-- 自動建議處理動作
CASE
WHEN abnormal_type ='PAYMENT_TIMEOUT'THEN'CANCEL_ORDER'
WHEN abnormal_type ='PROCESSING_TIMEOUT'THEN'ESCALATE_PROCESSING'
WHEN abnormal_type ='DELIVERY_TIMEOUT'THEN'CHECK_LOGISTICS'
ENDAS handling_action,
CURRENT_TIMESTAMPAS handling_time,
'PENDING'ASstatus,
CASE
WHEN user_abnormal_count >5THEN'High-risk customer, manual review required'
ELSE'Auto-generated handling instruction'
ENDAS notes
FROM abnormal_order_detection;五、數據寫出層設計
1. 實時監控指標輸出
-- 創建訂單處理實時指標表
CREATETABLE order_processing_metrics (
metric_time TIMESTAMP(3),
order_count BIGINT,
paid_count BIGINT,
shipped_count BIGINT,
delivered_count BIGINT,
cancelled_count BIGINT,
avg_payment_time DOUBLE,
avg_processing_time DOUBLE,
avg_delivery_time DOUBLE,
abnormal_order_rate DOUBLE,
PRIMARYKEY(metric_time)NOT ENFORCED
)WITH(
'connector'='prometheus',
'url'='http://prometheus-server:9090/api/v1/write',
'namespace'='ecommerce',
'metric.name'='order_processing_metrics'
);
-- 計算并輸出訂單處理指標
INSERTINTO order_processing_metrics
SELECT
TUMBLE_START(status_time,INTERVAL'5'MINUTE)AS metric_time,
COUNT(DISTINCT order_id)AS order_count,
COUNT(DISTINCTCASEWHENstatus='PAID'THEN order_id END)AS paid_count,
COUNT(DISTINCTCASEWHENstatus='SHIPPED'THEN order_id END)AS shipped_count,
COUNT(DISTINCTCASEWHENstatus='DELIVERED'THEN order_id END)AS delivered_count,
COUNT(DISTINCTCASEWHENstatus='CANCELLED'THEN order_id END)AS cancelled_count,
-- 計算平均支付時間(秒)
AVG(TIMESTAMPDIFF(SECOND, create_time, paid_time))AS avg_payment_time,
-- 計算平均處理時間(秒)
AVG(TIMESTAMPDIFF(SECOND, paid_time, shipped_time))AS avg_processing_time,
-- 計算平均配送時間(秒)
AVG(TIMESTAMPDIFF(SECOND, shipped_time, delivered_time))AS avg_delivery_time,
-- 異常訂單率
CASEWHENCOUNT(DISTINCT order_id)=0THEN0
ELSECOUNT(DISTINCTCASEWHEN abnormal_type ISNOTNULLTHEN order_id END)*1.0/COUNT(DISTINCT order_id)
ENDAS abnormal_order_rate
FROM order_lifecycle
LEFTJOIN abnormal_order_detection a ON order_lifecycle.order_id = a.order_id
-- 使用5分鐘滾動窗口聚合
GROUPBY TUMBLE(status_time,INTERVAL'5'MINUTE);2. 下游系統通知與集成
-- 創建訂單狀態變更通知表(Kafka)
CREATETABLE order_status_notifications (
notification_id STRING,
order_id STRING,
user_id STRING,
status OrderStatus,
prev_status OrderStatus,
status_time TIMESTAMP(3),
notification_type STRING,-- APP_PUSH, SMS, EMAIL
message_content STRING,
priority INT,
create_time TIMESTAMP(3),
statusAS'PENDING',
PRIMARYKEY(notification_id)NOT ENFORCED
)WITH(
'connector'='kafka',
'topic'='order_status_notifications',
'properties.bootstrap.servers'='kafka-broker:9092',
'key.format'='json',
'value.format'='json'
);
-- 生成訂單狀態變更通知
INSERTINTO order_status_notifications
SELECT
UUID()AS notification_id,
order_id,
user_id,
status,
prev_status,
status_time,
-- 根據狀態類型確定通知方式
CASE
WHENstatusIN('CANCELLED','REFUNDED')THEN'SMS'
WHENstatus='DELIVERED'THEN'APP_PUSH'
ELSE'APP_PUSH'
ENDAS notification_type,
-- 動態生成通知內容
CASEstatus
WHEN'PAID'THEN CONCAT('Order ', order_id,' has been paid successfully')
WHEN'SHIPPED'THEN CONCAT('Order ', order_id,' has been shipped')
WHEN'DELIVERED'THEN CONCAT('Order ', order_id,' has been delivered')
WHEN'CANCELLED'THEN CONCAT('Order ', order_id,' has been cancelled: ', cancel_reason_standardized)
WHEN'REFUNDED'THEN CONCAT('Order ', order_id,' has been refunded')
ELSE CONCAT('Order ', order_id,' status updated to ',status)
ENDAS message_content,
-- 設置通知優先級
CASE
WHENstatusIN('CANCELLED','REFUNDED')THEN1
WHENstatusIN('PAID','DELIVERED')THEN2
ELSE3
ENDAS priority,
CURRENT_TIMESTAMPAS create_time
FROM cleaned_order_status_events
-- 僅對關鍵狀態變更發送通知
WHEREstatusIN('PAID','SHIPPED','DELIVERED','CANCELLED','REFUNDED');六、系統優化與高級特性
1. 狀態管理與優化
-- 創建帶狀態TTL優化的訂單狀態視圖
CREATEVIEW order_status_with_ttl AS
SELECT
order_id,
user_id,
status,
status_time,
ROW_NUMBER()OVER(PARTITIONBY order_id ORDERBY status_time DESC)AS rn
FROM order_status_events
-- 使用Flink的狀態TTL功能自動清理過期狀態
WITH(
'state.ttl'='86400000',-- 狀態保留1天
'state.cleanup-strategy'='EMBEDDED'
)
WHERE rn =1;-- 只保留最新狀態2. 雙流JOIN優化
-- 優化的訂單與庫存雙流JOIN
CREATEVIEW order_inventory_joined AS
SELECT
/*+ OPTIONS('lookup.join.cache.ttl'='30s', 'lookup.join.cache.size'='10000') */
o.order_id,
o.status,
o.status_time,
oi.sku_id,
oi.quantity,
pi.available_stock,
pi.frozen_stock,
pi.total_stock
FROM cleaned_order_status_events o
JOIN order_item_events oi ON o.order_id = oi.order_id
-- 使用緩存優化的LOOKUP JOIN
JOIN product_inventory FOR SYSTEM_TIME ASOF o.event_time
ON oi.sku_id = product_inventory.sku_id
WHERE o.status='PAID';3. 動態配置與規則引擎
-- 創建規則配置表 (MySQL)
CREATETABLE order_processing_rules (
rule_id STRING,
rule_type STRING,-- INVENTORY_RULE, LOGISTICS_RULE, NOTIFICATION_RULE
priority INT,
condition_expr STRING,
action_expr STRING,
effective_time TIMESTAMP(3),
expire_time TIMESTAMP(3),
status STRING,-- ACTIVE, INACTIVE
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
PRIMARYKEY(rule_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='mysql-config',
'port'='3306',
'username'='flink_user',
'password'='flink_password',
'database-name'='config_db',
'table-name'='order_processing_rules'
);
-- 使用動態規則處理訂單
CREATEVIEW order_processing_with_rules AS
SELECT
o.*,
r.rule_id,
r.action_expr
FROM cleaned_order_status_events o
JOIN order_processing_rules r
ON r.rule_type ='INVENTORY_RULE'
AND r.status='ACTIVE'
AND o.status_time >= r.effective_time
AND(r.expire_time ISNULLOR o.status_time < r.expire_time)
-- 這里可以集成Flink的SQL函數來動態評估規則條件
AND o.status='PAID';七、系統監控與運維
1. 數據質量監控
-- 創建數據質量監控表
CREATETABLE data_quality_metrics (
metric_time TIMESTAMP(3),
source_table STRING,
total_records BIGINT,
null_order_id_count BIGINT,
late_records_count BIGINT,
schema_violation_count BIGINT,
avg_processing_time DOUBLE,
error_rate DOUBLE
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://es-node1:9200,http://es-node2:9200',
'index'='data_quality_metrics_{yyyyMMdd}',
'format'='json'
);
-- 監控訂單事件流數據質量
INSERTINTO data_quality_metrics
SELECT
TUMBLE_START(event_time,INTERVAL'1'MINUTE)AS metric_time,
'order_status_events'AS source_table,
COUNT(*)AS total_records,
COUNT(CASEWHEN order_id ISNULLTHEN1END)AS null_order_id_count,
COUNT(CASEWHEN status_time < event_time -INTERVAL'5'SECONDTHEN1END)AS late_records_count,
0AS schema_violation_count,-- 需要通過Flink的DDL驗證配置獲取
AVG(TIMESTAMPDIFF(MILLISECOND, status_time, PROCTIME()))AS avg_processing_time,
CASEWHENCOUNT(*)=0THEN0
ELSECOUNT(CASEWHEN order_id ISNULLORstatusISNULLTHEN1END)*1.0/COUNT(*)
ENDAS error_rate
FROM order_status_events
GROUPBY TUMBLE(event_time,INTERVAL'1'MINUTE);2. 慢查詢監控
-- 啟用Flink的查詢監控
SET'execution.profile.enabled'='true';
SET'execution.profile.sample-interval'='1000';
SET'execution.profile.delay'='0';
-- 創建查詢性能監控表
CREATETABLE query_performance_metrics (
query_id STRING,
job_name STRING,
start_time TIMESTAMP(3),
end_time TIMESTAMP(3),
duration_ms BIGINT,
rows_read BIGINT,
rows_written BIGINT,
peak_memory_usage BIGINT,
state_size BIGINT,
backpressure_count INT
)WITH(
'connector'='jdbc',
'url'='jdbc:mysql://mysql-monitor:3306/monitor_db',
'table-name'='query_performance_metrics',
'username'='flink_user',
'password'='flink_password'
);本系統基于FlinkSQL構建了一個完整的電商訂單狀態追蹤與實時處理平臺,實現了從數據接入、處理到輸出的全流程覆蓋。系統具有以下特點:
- 完整性:覆蓋訂單狀態追蹤、庫存管理、物流調度等電商核心業務流程
- 實時性:基于Flink的流處理能力,實現毫秒級狀態響應與處理
- 可靠性:通過檢查點、狀態管理和冪等性設計確保數據一致性
- 可擴展性:模塊化設計支持業務規則動態調整和功能擴展
- 可監控性:完善的指標收集和監控體系,確保系統穩定運行





























