貨拉拉Flink CDC實踐:穩定性建設與數據入湖新探索
一、貨拉拉業務背景介紹
1. 貨拉拉背景介紹
貨拉拉是一家拉貨搬家跑腿發長途平臺,創立于 2013 年,成長于粵港澳大灣區,是從事同城/跨城貨運、企業版物流服務、搬家、零擔、跑腿、冷運、汽車租售及車后市場服務的互聯網物流商城。通過共享模式整合社會運力資源,完成海量運力儲備,并依托移動互聯、大數據和人工智能技術,搭建“方便、科技、可靠”的貨運平臺,實現多種車型的即時智能調度,為個人、商戶及企業提供高效的物流解決方案。
2. 業務整體增長情況

截至 2023 年 12 月,貨拉拉業務范圍覆蓋全球 11 個市場,包括中國及東南亞、南亞、南美洲等地區,其中中國內地總共覆蓋 363 座城市,月活司機達 90 萬,月活用戶達 1200 萬,每天產生訂單、司機、汽車物聯網數據量達到 PB 級別。如何穩定、高效、快速采集到這些數據,挖掘業務數據價值,釋放新質生產力成為公司運營和決策的關鍵。
3. 業務攀升的穩定性挑戰

隨著企業業務量的急速攀升,逐漸遇到新的挑戰,首先是實時抽數延遲嚴重,導致下游 Flink 的雙流 Join 產生問題,并帶來數據時效性、數據鏈路穩定性等問題。早期使用 Canal 作為實時數采集主要存在以下問題:
- 架構陳舊:單節點部且非分布式運行,維護頻率低。
- Canal 維護性差:可維護性差,Canal 社區的整體上下游處于不活躍,導致維護性成本特別高。
- 上游數據采集穩定性差,結合歷史故障以及冒煙測試,發現實時數據采集穩定性主要集中在上游數據采集端。
接下來將介紹貨拉拉實時數據采集改造為什么選擇 Flink CDC 作為新的實時數據采集和同步框架。
二、貨拉拉為何選擇 Flink CDC
1. 選擇四象限作為思考切入點

首先我們會從上述四點去考慮到底需要一款什么工具作為貨拉拉的實時數據同步工具。
- 功能性:實時數據平臺首先考慮完善的功能性,Flink SQL 目前開源版本僅支持單表單庫同步,如果業務方想完成其同步作業的話,必須使用 SQL 或 Flink CDC3.0 的 yaml 配置化方式才能完成整庫同步開發。
- 對標 Canal 兼容性:歷史業務方使用 Canal 進行數據采集,以及下游不限于大數據團隊的消費方均使用 Canal,因此要對部分 Canal 功能進行兼容性對標,已實現業務感知和改動最小化。
- 鏈路穩定性保障:涉及下游任務方的改造,當前只能通過 Kafka 消費組獲取下游消費方,因此希望下游消費方無需做過多改動,如 SQL 任務下游僅需切換 CDC 數據源即可;同時包裝了一個消費 CDC 的 SDK 供業務使用,依據相關 topic 命名規則即可完成整個鏈路切換,保障鏈路切換的穩定性。
- 保障數據一致性:鏈路切換時希望保障數據的一致性,即最終數據結果是等價的。因此需要通過一些科學的數據驗證手段,如雙跑驗證、采用對數工具,保證數據最終一致。
2. 開源組件對比

我們在進行實時數據同步調研時對一些開源組件的功能、使用場景、穩定性以及社區生態等多方面進行了對比,包括 Flink CDC、Canal、Apache SeaTunnel 以及 DataX。
- CDC 同步機制:傳統數據同步方面,DataX 只支持查詢的 CDC 操作。Flink CDC 只需要訂閱 binlog 即可完成數據采集比較服務業務訴求。
- 全量+增量同步:只有 Flink CDC 支持全量+增量數據同步,滿足貨拉拉某些場景下采集全量數據構建湖倉一體,業務需要持續性地對歷史數據進行全量采集并加上增量數據同步,而其他組件在此方面表現為不支持或部分支持。
- 部署形態:由于 Flink CDC 是依托于 Flink 的底層架構,Flink 本身采用分布式部署,架構選型會考慮 Flink CDC 在數據采集階段以及下游消費階段的整體的一些協調性。
- 穩定性:Flink CDC 依靠于 Flink 的 HA 機制,包括 ZooKeeper 以及 on K8s 的高可用,整體上會更加傾向于 Flink CDC 作為實時鏈路的數據同步工具。
3. 未來數據入湖需求

我們正在建設的數據入湖,也做了一些面向未來的設計,包括 CDC 數據入湖分析,數據時效性高且為結構化數據,而埋點數據時效性低且非結構化數據,以及日志數據需要間接性統計和分析,并且為非結構樹數據。這里我們需要通過引入 CDC pipeline 機制對接 Paimon Yaml 配置,便可通過 CDC 將傳統 MySQL 數據庫直接訂閱入湖到 Paimon,然后進行數據加工等 ETL 相關操作。

經過前期的深度思考、對比與總結最終形成了如上圖所示的架構,主要包括數據來源、業務場景、數據服務以及數據湖平臺、數據引擎、湖倉格式、數據存儲層以及業務等。數據內部開發平臺主要是元數據平臺(元初)、離線數據平臺(IDP)以及實時數據開發平臺(飛流);數據湖平臺主要包含數據集成服務和湖倉優化服務。數據集成服務采用 Flink CDC 實時采集把數據源的數據訂閱到湖倉里面,并通過 Amoro 進行自動優化湖倉,從而達到湖倉一體的整體架構。在執行引擎方面當前只是完成了基于 Flink Engine 的建設,對于灰色的 Doris Engine、Spark Engine 以及 Presto Engine 將是 2025 年的建設重點,數據加工完成后將輸送給業務方,如埋點業務、業務畫像以及實時大屏、同時也會輸出給內部 GPT 項目等提供給業務方去使用。
三、貨拉拉 CDC 生產實踐
1. 飛流實時計算平臺能力建設

飛流作為貨拉拉的實時計算平臺,為了很好的對接 Flink CDC,實時數據計算平臺進行了升級優化,主要包括以下幾個方面:
- 平臺感知能力:修改了很多底層代碼,新增了 Metrics 的一些能力,如把 DB 底層的 Metrics 進行了封裝,連同 Flink 的 Metrics 一并上報,形成報警能力,便于業務及時發現 DB 底層的整體采集狀況。
- 平臺配置化能力:對 Flink CDC 的 catalog 做了一層封裝,同時支持 Flink Yaml 的配置化方式,提供了更多的靈活性。
- 平臺數據協議優化:由于采用 Flink CDC Connector 進行二次開發,當前對數據協議進行了二次封裝,把內部的 DB 層數據進行打寬,并增加了一些原始字段,支持業務方消費這些數據,同時做到了傳統數據庫的采集數據落庫。
- 數據解析優化:通過增加元數據字段的一些信息,提高了在數據協議和數據解析的速度。
- SDK 封裝:由于 CDC 數據的使用者不僅包括大數據內部平臺,還包含很多線上業務方,因此封裝了一套 SDK,屏蔽 CDC 相對業務方比較復雜的概念與邏輯,交付業務方使用。
從數據架構層面,目前正在做的是統一數據采集的工作,如海內網逐步推進整體使用 Flink CDC 替換掉 Canal,以及一鍵入倉、一鍵入湖的工作,甚至一些流量回放業務場景。在數據遷移方面,我們也會用到 Flink CDC。
穩定性方面,引入了限流的能力,如會限制 sink 的采集速度,避免在采集高風險期引起數據庫的整體壓力。采集性能方面引入了多線程處理,提升解析能力。同時做了全局血緣的關聯,用于快速感知業務方使用 CDC 表,以及 CDC 采集數據影響下游任務,可以快速讓業務方感知采集出現問題時會導致哪些業務受到影響。
以上就是對飛流實時計算平臺整體能力的介紹。
2. 常規對數方法校驗

由于采用 Flink CDC 代替了 Canal 進行實時數據采集,因此需要進行數據校驗和對比。首先在常規對數方面,對特殊字段類型,如時間類型、bigInt、dynamic 等特殊字段的數據一致性校驗,同時基于時間切片做了 count 統計操作。由于消費方在大數據內部,因此還會涉及到數倉分層逐層對數的校驗,這里我們使用 Flink batch task 在維度時間對齊、最終切片對齊的最大差異、差異占比以及差異分布等方面進行統一對數。
3. 數據科學方法校驗

上文提到使用 Flink batch task 進行統一對數,主要會在基于差異率的正負進行分布式對數,差異統計表、全局指標的差值以及與 Canal 對比差異的趨勢率。如上圖可以看到,可通過總條數以及每一個時間切片上面每一個數據的準確性進行整體對比,確保從 ODS 到 DWD 以及 DWS 層整體鏈路數據準確性和最終一致性,如果出現數據缺少將會主動進行排查。
4. 數據雙跑校驗

還會通過數據雙跑進行數據校驗,如通過生產 Kafka 和驗證 Kafka 去進行數據交叉鏈路驗證對比,然后基于 binlog 采集時間對比這一段時間的數據總數以及數據的準確性進而得出一個交叉率,當兩部分數據完全一致時交叉率應該是 100%,最終會輸出一份報告給到業務方,使業務方信任,并推動業務使用鏈路切換工作順利開展。
5. Schema Change 信息變更處理

由于基于 Flink CDC Connector 進行開發,只有 3.0 才支持 Schema 變更操作,當前做法是把 Schema Change 通過一個測流發送到對應告警的 Kafka topic,并通過消費再發出一個告警卡片,同時會將此任務告警和下一個任務 Flink taskId 進行關聯,通知下游業務方 Schema 變更消息。后續我們將接入 CDC3.X Pipeline Connector,進行定制化開發,提供分流告警和下游支持等。
6. Canal VS Flink CDC 穩定性對比

下面介紹一下切換后的整體穩定性。以某一真實在線業務為例,在下午高峰期采集的時候,使用 Canal 最大的延遲在 3030s 左右,而使用 Flink CDC 基本維持在毫秒級別。在采集的整體穩定性方面,可以看到 CDC 整體采集穩定性要比 Canal 有顯著提升,最高可提高 80 倍。采集波動率方面,Canal 采集按照 Batch 作業有批量的波動,而 CDC 則保持在一個穩定的水平。

截止到目前,我們已經有 100+ 個 CDC 采集業務,其中有 70+ 是之前的 Canal 任務切換到 Flink CDC,后續海外一些 Canal 采集也將會采用 Flink CDC 代替。
整體上延遲最高下降了 80%,同時我們基于協議進行改造,因此消息中間件的數據存量也下降了 30%,并且完成了一些核心應用加關鍵線上業務的接入。上圖給出了整體延遲的 1h 截圖,可發現使用 Flink CDC 的數據采集基本上穩定保持在 1s 左右,可以比較好地保持數據的新鮮度。
7. 建設成果

整體建設成果方面,當前通過訂閱關系型數據庫,通過飛流平臺使用 Flink 作業進行數據采集,寫入到 Kafka 或流入數據湖組件上,后續經過離線 ETL 加工輸出后生成一些報表。目前公司內部業務包括小伙拉行、貨拉拉、跑腿等多個業務線使用 Flink CDC 代替了原先的 Canal 進行實時數據鏈路采集,整體業務數據量達到 TB-PB 級別,并且多個實時看板、云臺、BI 報表以及交易 2.0 等業務也使用 Flink CDC 進行實時數據采集。最終我們希望可以實現數據訂閱鏈路的“以舊換新“,后續將持續對老鏈路的替換,最終完成平臺化工程建設。
四、CDC 數據入湖&未來展望

結合公司內部使用場景以及阿里最新發布的 Fluss 項目,為我們帶來了一些新的想法。如上圖,業務數據經過 CDC 訂閱同步后進入到 Fluss,Fluss 將消費 CDC 的數據產生 changeLog,并將這個 changeLog 給到 Flink 下游繼續去消費。同時也會通過 Compaction Service 生成數據到 LakeHouse Storage,這一部分數據通過 Compaction Service 生成一些湖格式的表,如 Paimon 或 Iceberg 表,這些表可以通過外表的形式給到 OLAP 引擎或流計算引擎進行查詢。同時在 Flink 的 source 一端做合并讀的操作,如把 LakeHouse storage 進行合并讀從而屏蔽掉用戶對流和批的差異。
當然這樣將數據引入到 LakeHouse storage 會帶來讀放大的問題,可以引入 Amoro 持續優化 Paimon 和 Iceberg 表減少小文件的數量,同時在為下游消費這部分 CDC 數據時帶來更好的體驗。

當前我們正在探索 Flink CDC+數據湖(Paimon 和 Iceberg),并結合 Apache Amoro 實現全自動數據入湖,形成完整的數據入湖生態體系,進一步提升數據時效性和準確性,以滿足業務方對數據新鮮度的需求。并將與數據湖開源社區開展深度合作與探討,把場景固化,加速湖倉一體落地的進程。
我們還會考慮多數據源訂閱的需求,滿足關系型和非慣性數據的訂閱查詢,如支持 MongoDB 數據的訂閱,構建貨拉拉統一實時采集和湖倉數據生態。

































