阿里近實時增量處理技術架構解析

一、MaxCompute湖倉一體發展進程

MaxCompute作為阿里云自研的海量大數據處理平臺已經有十幾年的發展歷史,在規模和擴展性方面一直表現比較優秀。其依托阿里云飛天分布式操作系統,能夠提供快速,完全托管的EB級數據倉庫及數據湖解決方案,可經濟高效的處理海量數據。目前,其承擔著阿里集團絕大部分離線數據存儲和計算力,是阿里云產品矩陣中最重要的自研核心平臺之一。
MaxCompute發展之初,主要聚焦數倉方面的大數據處理業務場景,并且處理的數據源主要為格式化數據。隨著數據處理場景的多樣化和業界數據湖架構的興起,加上阿里集團內部本身數據也非常多,支持多樣化數據源也就成為了一個必選項。因此MaxCompute設計了完善的外表機制,可以讀取存儲在外部的多種格式的數據對象,例如Hadoop開源體系,OSS半結構化或非結構化數據,為此也盡可能設計開發統一的元數據處理架構,此階段MaxCompute在湖倉一體化解決方案中邁出了重要一步,極大的擴展了數據處理的業務場景,有效的打破數據孤島,聯動各方面的數據進行綜合分析來挖掘整體數據價值。但時效性不足,通常是T+1離線場景。
隨著用戶數和數據規模不斷增加,很多業務場景也越加復雜,需要更加完善綜合的整體解決方案。其中的關鍵環節之一就是數據需要更加高效的流轉起來,為此MaxCompute進一步設計完善開放存儲和計算架構,更好的去融合生態,讓數據可流暢的進得來也出得去。此外,還有一個重要的業務場景是大規模批量處理和高時效高效率增量處理一體化解決方案,為簡化用戶數據處理鏈路,節省不同系統之間的數據遷移成本以及冗余計算和存儲成本,我們設計開發了MaxCompute離線和近實時增量處理的一體化架構??傮w來說,現階段以及未來會基于統一的存儲、統一的元數據、統一的計算引擎有效支撐湖倉一體的整體技術架構,讓數據能夠開放互通高效流轉,并且計算和存儲成本持續優化。
二、MaxCompute近實時增量處理技術架構簡介
1、MaxCompte離線 & 近實時增量處理業務系統架構現狀

隨著當前數據處理的業務場景日趨復雜,對于時效性要求低的大規模數據全量批處理的場景,直接使用MaxCompute足以很好的滿足業務需求,對于時效性要求很高的秒級實時數據處理或者流處理,則需要使用實時系統或流系統來滿足需求。
但其實對于大部份業務場景,并不要求秒級數據更新可見,更多的是分鐘級或者小時級的增量數據處理場景,并且疊加海量數據批處理場景。
對于這類業務場景的解決方案,如果使用單一的MaxCompute離線批量處理鏈路,為了計算的高效性,需要將用戶各種復雜的一些鏈路和處理邏輯轉化成T+1的批次處理,鏈路復雜度增加,也可能產生冗余的計算和存儲成本,且時效性也較差。但如果使用單一的實時系統,資源消耗的成本比較高,性價比也較低,并且大規模數據批處理的穩定性也不足。因此當前比較典型的解決方案是Lambda架構,全量批處理使用MaxCompute鏈路,時效性要求比較高的增量處理使用實時系統鏈路,但該架構也存在大家所熟知的一些固有缺陷,比如多套處理和存儲引擎引發的數據不一致問題,多份數據冗余存儲和計算引入的額外成本,架構復雜以及開發周期長等。
針對這些問題近幾年大數據開源生態也推出了各種解決方案,最流行的就是Spark/Flink/Presto開源數據處理引擎,深度集成開源數據湖Hudi、Delta Lake和Iceberg三劍客,來綜合提供解決方案,解決Lamdba架構帶來的一系列問題,而MaxCompute近一年自研開發的離線近實時增量處理一體化架構,同樣是為了解決這些問題而設計,不僅僅具備分鐘級的增全量數據讀寫以及數據處理的業務需求,也能提供Upsert,Timetravel等一系列實用功能,可大幅擴展業務場景,并且有效的節省數據計算,存儲和遷移成本,切實提高用戶體驗。下文就將介紹該技術架構的一些典型的功能和設計。
2、MaxCompute近實時增量處理技術架構

MaxCompute近實時增量處理整體架構的設計改動主要集中在五個模塊:數據接入、計算引擎、數據優化服務,元數據管理,數據文件組織。其他部份直接復用MaxCompute已有的架構和計算流程,比如數據的分布式存儲直接集成了阿里云基礎設施盤古服務。
- 數據接入主要支持各種數據源全量和近實時增量導入功能。MaxCompute聯合相關產品定制開發多種數據接入工具,例如MaxCompute定制開發的Flink Connector,DataWorks的數據集成等,用來支持高效的近實時增量數據導入。這些工具會對接MaxCompute的數據通道服務Tunnel Server,主要支持高并發分鐘級增量數據寫入。此外,也支持MaxCompute SQL,以及其它一些接口用于支持全量數據高效寫入。
- 計算引擎主要包含MC自研的SQL引擎,負責Timetravel和增量場景下的SQL DDL/DML/DQL的語法解析,優化和執行鏈路。此外,MaxCompute內部集成的Spark等引擎也在設計開發支持中。
- 數據優化服務主要由MaxCompute的Storage Service來負責智能的自動管理增量數據文件,其中包括小文件合并Clustering,數據Compaction,數據排序等優化服務。對于其中部分操作,Storage Service會根據數據特征,時序等多個維度綜合評估,自動執行數據優化任務,盡可能保持健康高效的數據存儲和計算狀態。
- 元數據管理主要負責增量場景下數據版本管理,Timetravel管理,事務并發沖突管理,元數據更新和優化等。
- 數據文件組織主要包含對全量和增量數據文件格式的管理以及讀寫相關的模塊。
三、核心設計解剖
1、統一的數據文件組織格式
圖片
要支持全量和增量處理一體化架構首先需要設計統一的表類型以及對應的數據組織格式,這里稱為Transactional Table2.0,簡稱TT2,基本可以支持普通表的所有功能,同時支持增量處理鏈路的新場景,包括timetravel查詢、upsert操作等。
TT2要生效只需要在創建普通表時額外設置主鍵primary key(PK),以及表屬性transactional為true即可。PK列用于支持Upsert鏈路功能,PK值相同的多行記錄在查詢或者Compaction會merge成一行數據,只保留最新狀態。transactional屬性則代表支持ACID事務機制,滿足讀寫快照隔離,并且每行數據會綁定事務屬性,比如事務timestamp,用來支持timetravel查詢,過濾出正確數據版本的記錄。此外TT2的tblproperties還可以設置其他的一些可選的表屬性,比如write.bucket.num用來配置數據寫入的并發度,acid.data.retain.hours用來配置歷史數據的有效查詢時間范圍等。
TT2表數據文件存在多種組織格式用來支持豐富的讀寫場景。其中base file數據文件不保留Update/Delete中間狀態,用來支撐全量批處理的讀寫效率,delta file增量數據文件會保存每行數據的中間狀態,用于滿足近實時增量讀寫需求。
為了進一步優化讀寫效率,TT2支持按照BucketIndex對數據進行切分存儲,BucketIndex數據列默認復用PK列,bucket數量可通過配置表屬性write.bucket.num指定,數據寫入的高并發可通過bucket數量水平擴展,并且查詢時,如果過濾條件為PK列,也可有效的進行Bucket裁剪查詢優化。數據文件也可按照PK列進行排序,可有效提升MergeSort的效率,并有助于DataSkipping查詢優化。數據文件會按照列式壓縮存儲,可有效減少存儲的數據量,節省成本,也可有效的提升IO讀寫效率。
2、數據近實時流入

前面介紹了統一的數據組織格式,接下來需要考慮數據如何高效寫入TT2。
數據流入主要分成近實時增量寫入和批量寫入兩種場景。這里先描述如何設計高并發的近實時增量寫入場景。用戶的數據源豐富多樣,可能存在數據庫,日志系統或者其他消息隊列等系統中,為了方便用戶遷移數據寫入TT2, MaxCompute定制開發了Flink Connector、Dataworks數據集成以及其它開源工具,并且針對TT2表做了很多專門的設計開發優化。這些工具內部會集成MaxCompute數據通道服務Tunnel提供的客戶端SDK,支持分鐘級高并發寫入數據到Tunnel Server,由它高并發把數據寫入到每個Bucket的數據文件中。
寫入并發度可通過前面提及的表屬性write.bucket.num來配置,因此寫入速度可水平擴展。對同一張表或分區的數據,寫入數據會按pk值對數據進行切分,相同pk值會落在同一個bucket桶中。此外,數據分桶的好處還有利于數據優化管理操作例如小文件clustering,compaction等都可以桶的粒度來并發計算,提高執行效率。分桶對于查詢優化也非常有好處,可支持bucket裁剪、shuffle move等查詢優化操作。
Tunnel SDK提供的數據寫入接口目前支持upsert和delete兩種數據格式,upsert包含insert / update兩種隱含語義,如數據行不存在就代表insert,如已存在就代表update。commit接口代表原子提交這段時間寫入的數據如返回成功就代表寫入數據查詢可見,滿足讀寫快照隔離級別,如返回失敗,數據需要重新寫入。
3、SQL批量寫入
圖片
批量導入主要通過SQL進行操作。為了方便用戶操作,實現了操作TT2所有的DDL / DML語法。SQL引擎內核模塊包括Compiler、Optimizer、Runtime等都做了大量改造開發以支持相關功能,包括特定語法的解析,特定算子的Planner優化,針對pk列的去重邏輯,以及runtime構造Upsert格式數據寫入等。數據計算寫入完成之后,會由Meta Service來原子性更新Meta信息,此外,也做了大量改造來支持完整的事務機制保證讀寫隔離、事務沖突檢測等等。
4、小數據文件合并
圖片
由于TT2本身支持分鐘級近實時增量數據導入,高流量場景下可能會導致增量小文件數量膨脹,從而引發存儲訪問壓力大、成本高,并且大量的小文件還會引發meta更新以及分析執行慢,數據讀寫IO效率低下等問題,因此需要設計合理的小文件合并服務, 即Clustering服務來自動優化此類場景。
Clustering服務主要由MaxCompute 內部的Storage Service來負責執行,專門解決小文件合并的問題,需要注意的是,它并不會改變任何數據的歷史中間狀態,即不會消除數據的Update/Delete中間狀態。
結合上圖可大概了解Clustering服務的整體操作流程。Clustering策略制定主要根據一些典型的讀寫業務場景而設計,會周期性的根據數據文件大小,數量等多個維度來綜合評估,進行分層次的合并。Level0到Level1主要針對原始寫入的Delta小文件(圖中藍色數據文件)合并為中等大小的Delta文件(圖中黃色數據文件),當中等大小的Delta文件達到一定規模后,會進一步觸發Level1到Level2的合并,生成更大的Delta文件(圖中橙色數據文件)。
對于一些超過一定大小的數據文件會進行專門的隔離處理,不會觸發進一步合并,避免不必要的讀寫放大問題,如圖中Bucket3的T8數據文件。超過一定時間跨度的文件也不會合并,因為時間跨度太大的數據合并在一起的話,當TimeTravel或者增量查詢時,可能會讀取大量不屬于此次查詢時間范圍的歷史數據,造成不必要的讀放大問題。
由于數據是按照BucketIndex來切分存儲的,因此Clustering服務會以bucket粒度來并發執行,大幅縮短整體運行時間。
Clustering服務需要和Meta Service進行交互,獲取需要執行此操作的表或分區的列表,執行結束之后,會把新老數據文件的信息傳入Meta Service,它負責Clustering操作的事務沖突檢測,新老文件meta信息原子更新、老的數據文件回收等。
Clustering服務可以很好的解決大文件數量膨脹引發的一系列效率低下的讀寫問題,但不是頻率越高越好,執行一次也會消耗計算和IO資源,至少數據都要全部讀寫一遍,存在一定的讀寫放大問題。因此執行策略的選擇尤其重要,所以目前暫時不會開放給用戶手動執行,而是引擎根據系統狀態智能自動觸發執行,可保障Clustering服務執行的高效率。
5、數據文件Compaction

除了小文件膨脹問題需要解決外,依然還有一些典型場景存在其它問題。TT2支持update、delete格式的數據寫入,如果存在大量此格式的數據寫入,會造成中間狀態的冗余記錄太多,引發存儲和計算成本增加,查詢效率低下等問題。因此需要設計合理的數據文件compaction服務優化此類場景。
Compaction服務主要由MaxCompute 內部的Storage Service來負責執行,既支持用戶手動執行SQL語句觸發、也可通過配置表屬性按照時間頻率、Commit次數等維度自動觸發。此服務會把選中的數據文件,包含base file和delta file,一起進行Merge,消除數據的Update / Delete中間狀態,PK值相同的多行記錄只保留最新狀態的一行記錄,最后生成新的只包含Insert格式的base file。
結合上圖可大概了解Compaction服務的整體操作流程。t1到t3時間段,一些delta files寫入進來,觸發compaction操作,同樣會以bucket粒度并發執行,把所有的delta files進行merge,然后生成新的base file。之后t4和t6時間段,又寫入了一批新的delta files,再觸發compaction操作,會把當前存在的base file和新增的delta files一起做merge操作,重新生成一個新的base file。
Compaction服務也需要和Meta Service進行交互,流程和Clustering類似,獲取需要執行此操作的表或分區的列表,執行結束之后,會把新老數據文件的信息傳入Meta Service,它負責Compaction操作的事務沖突檢測,新老文件meta信息原子更新、老的數據文件回收等。
Compaction服務通過消除數據中間歷史狀態,可節省計算和存儲成本,極大加速全量快照查詢場景的效率,但也不是頻率越高越好,首先執行一次也要讀取一遍全量數據進行Merge,極大消耗計算和IO資源,并且生成的新base file也會占據額外的存儲成本,而老的delta file文件可能需要用于支持timetravel查詢,因此不能很快刪除,依然會有存儲成本,所以Compaction操作需要用戶根據自己的業務場景和數據特征來合理選擇執行的頻率,通常來說,對于Update / Delete格式的記錄較多,并且全量查詢次數也較多的場景,可以適當增加compaction的頻率來加速查詢。
6、事務管理
以上主要介紹了典型的數據更新操作,而它們的事務并發管理都會統一由Meta Service進行控制。
圖片
上面表格詳細展示了各個具體操作并發執行的事物沖突規則。Meta服務采用了經典的MVCC模型來滿足讀寫快照隔離,采用OCC模型進行樂觀事務并發控制。對于一些高頻的操作單獨設計優化了事務沖突檢測和重試機制,如clustering操作和insert into 并發執行,即使事務Start和Commit時間出現交叉也不會沖突失敗,都能成功執行,即使在原子提交Meta信息更新時出現小概率失敗也可在Meta層面進行事務重試,代價很低,不需要數據重新計算和讀寫。
此外,各種數據文件信息以及快照版本也需要有效的管理,其中包含數據版本、統計信息、歷史數據、生命周期等等。對于TimeTravel和增量查詢,Meta層面專門進行了設計開發優化,支持高效的查詢歷史版本和文件信息。
7、TimeTravel查詢

基于TT2,計算引擎可高效支持典型的業務場景TimeTravel查詢,即查詢歷史版本的數據,可用于回溯歷史狀態的業務數據,或數據出錯時,用來恢復歷史狀態數據進行數據糾正,當然也支持直接使用restore操作恢復到指定的歷史版本。
對于TimeTravel查詢,會首先找到要查詢的歷史數據版本之前最近的base file,再查找后面的delta files,進行合并輸出,其中base file可以用來加速查詢讀取效率。
這里結合上圖進一步描述一些具體的數據查詢場景。比如創建一TT2表,schema包含一個pk列和一個val列。左邊圖展示了數據變化過程,在t2和t4時刻分別執行了compaction操作,生成了兩個base file: b1和b2。b1中已經消除了歷史中間狀態記錄(2,a),只保留最新狀態的記錄 (2,b)。
如查詢t1時刻的歷史數據,只需讀取delta file (d1)進行輸出; 如查詢t2時刻,只需讀取base file (b1) 輸出其三條記錄。如查詢t3時刻,就會包含base file ( b1)加上delta file (d3)進行合并輸出,可依此類推其他時刻的查詢。
可見,base文件雖可用來加速查詢,但需要觸發較重的compaction操作,用戶需要結合自己的業務場景選擇合適的觸發策略。
TimeTravel可根據timestamp和version兩種版本形態進行查詢,除了直接指定一些常量和常用函數外,我們還額外開發了get_latest_timestamp和get_latest_version兩個函數,第二個參數代表它是最近第幾次commit,方便用戶獲取我們內部的數據版本進行精準查詢,提升用戶體驗。
8、增量查詢
圖片
此外,SQL增量查詢也是重點設計開發的場景,主要用于一些業務的近實時增量處理鏈路,新增SQL語法采用between and關鍵字,查詢的時間范圍是左開右閉,即begin是一個開區間,必須大于它,end是一個閉區間。
增量查詢不會讀取任何base file,只會讀取指定時間區間內的所有delta files,按照指定的策略進行Merge輸出。
通過上訴表格可進一步了解細節,如begin是t1-1,end是t1,只讀取t1時間段對應的delta file (d1)進行輸出, 如果end是t2,會讀取兩個delta files (d1和d2);如果begin是t1,end是t2-1,即查詢的時間范圍為(t1, t2),這個時間段是沒有任何增量數據插入的,會返回空行。
對于Clustering和Compaction操作也會產生新的數據文件,但并沒有增加新的邏輯數據行,因此這些新文件都不會作為新增數據的語義,增量查詢做了專門設計優化,會剔除掉這些文件,也比較貼合用戶使用場景。
9、歷史版本數據回收
由于Timetravel和增量查詢都會查詢數據的歷史狀態,因此需要保存一定的時間,可通過表屬性acid.data.retain.hours來配置保留的時間范圍。如果歷史狀態數據存在的時間早于配置值,系統會開始自動回收清理,一旦清理完成,TimeTravel就查詢不到對應的歷史狀態了?;厥盏臄祿饕僮魅罩竞蛿祿募刹糠?。
同時,也會提供purge命令,用于特殊場景下手動觸發強制清除歷史數據。
10、數據接入生態集成現狀
初期上線支持接入TT2的工具主要包括:
- DataWorks數據集成:支持數據庫等豐富的數據源表全量以及增量的同步業務。
- MaxCompute Flink Connector:支持近實時的upsert數據增量寫入,這一塊還在持續優化中,包括如何確保Exactly Once語義,如何保障大規模分區寫入的穩定性等,都會做深度的設計優化。
- MaxCompute MMA:支持大規模批量 Hive數據遷移。很多業務場景數據遷移可能先把存在的全量表導入進來,之后再持續近實時導入增量數據,因此需要有一些批量導入的工具支持。
- 阿里云實時計算Flink版Connector:支持近實時Upsert數據增量寫入,功能還在完善中。
- MaxCompute SDK:直接基于SDK開發支持近實時導入數據,不推薦
- MaxCompute SQL:通過SQL批量導入數據
對其它一些接入工具,比如Kafka等,后續也在陸續規劃支持中。
11、特點
作為一個新設計的架構,我們會盡量去覆蓋開源數據湖(HUDI / Iceberg)的一些通用功能,有助于類似業務場景的用戶進行數據和業務鏈路遷移。此外,MaxCompute離線 & 近實時增量處理一體化架構還具備一些獨特的亮點:
- 統一的存儲、元數據、計算引擎一體化設計,做了非常深度和高效的集成,具備存儲成本低,數據文件管理高效,查詢效率高,并且Timetravel / 增量查詢可復用MaxCompute批量查詢的大量優化規則等優勢。
- 全套統一的SQL語法支持,非常便于用戶使用。
- 深度定制優化的數據導入工具,支持一些復雜的業務場景。
- 無縫銜接MaxCompute現有的業務場景,可以減少遷移、存儲、計算成本。
- 完全自動化管理數據文件,保證更好的讀寫穩定性和性能,自動優化存儲效率和成本。
- 基于MaxCompute平臺完全托管,用戶可以開箱即用,沒有額外的接入成本,功能生效只需要創建一張新類型的表即可。
- 作為完全自研的架構,需求開發節奏完全自主可控。
四、應用實踐與未來規劃
1、離線 & 近實時增量處理一體化業務架構實踐

基于新架構,MaxCompute可重新構建離線 & 近實時增量處理一體化的業務架構,即可以解決大部分的Lambda架構的痛點,也能節省使用單一離線或者實時系統架構帶來的一些不可避免的計算和存儲成本。各種數據源可以方便的通過豐富的接入工具實現增量和離線批量導入,由統一的存儲和數據管理服務自動優化數據編排,使用統一的計算引擎支持近實時增量處理鏈路和大規模離線批量處理鏈路,而且由統一的元數據服務支持事務和文件元數據管理。它帶來的優勢非常顯著,可有效避免純離線系統處理增量數據導致的冗余計算和存儲,也能解決純實時系統高昂的資源消耗成本,也可消除多套系統的不一致問題和減少冗余多份存儲成本以及系統間的數據遷移成本,其他的優勢可以參考上圖,就不一一列舉了??傮w而言,就是使用一套架構既可以滿足增量處理鏈路的計算存儲優化以及分鐘級的時效性,又能保證批處理的整體高效性,還能有效節省資源使用成本。
2、未來規劃
最后再看一下未來一年內的規劃:
- 持續完善SQL的整體功能支持,降低用戶接入門檻;完善Schema Evolution支持。
- 更加豐富的數據接入工具的開發支持,持續優化特定場景的數據寫入效率。
- 開發增量查詢小任務分鐘級別的pipeline自動執行調度框架,極大的簡化用戶增量處理鏈路業務的開發難度,完全自動根據任務執行狀態觸發pipeline任務調度,并自動讀取增量數據進行計算。
- 持續繼續優化SQL查詢效率,以及數據文件自動優化管理。
- 擴展生態融合,支持更多的第三方引擎讀寫TT2。
五、Q & A
Q1:Bucket數量的設置與commit間隔以及compaction間隔設置的最佳推薦是什么?
A1:Bucket數量與導入的數據量相關,數據量越大,建議設置的bucket數量多一些,在批量導入的場景,推薦每個bucket的數據量不要超過1G,在近實時增量導入場景,也要根據Tunnel的可用資源以及QPS流量情況來決定bucket數量。對于commit的間隔雖然支持分鐘級數據可見,但如果數據規模較大,bucket數量較多,我們推薦間隔最好在五分鐘以上,也需要考慮結合 Flink Connector的checkpoint機制來聯動設置commit頻率,以支持Exactly Once語義,流量不大的話,5~10分鐘間隔是推薦值。Compaction間隔跟業務場景相關,它有很大的計算成本,也會引入額外的base file存儲成本,如果對查詢效率要求比較高且比較頻繁,compaction需要考慮設置合理的頻率,如果不設置,隨著delta files和update記錄的不斷增加,查詢效率會越來越差。
Q2:會不會因為commit太快,compaction跟不上?
A2:Commit頻率和Compaction頻率沒有直接關系,Compaction會讀取全量數據,所以頻率要低一些,至少小時或者天級別,而Commit寫入增量數據的頻率是比較快的,通常是分鐘級別。
Q3:是否需要專門的增量計算優化器?
A3:這個問題很好,確實需要有一些特定的優化規則,目前只是復用我們現有的SQL優化器,后續會持續規劃針對一些特殊的場景進行增量計算的設計優化。
Q4:剛剛說會在一兩個月邀測MaxCompute新架構,讓大家去咨詢。是全部替換為新的架構還是上線一部分的新架構去做些嘗試,是要讓用戶去選擇嗎?還是怎樣?
A4:新技術架構對用戶來說是透明的,用戶可以通過MaxCompute無縫接入使用,只需要創建新類型的表即可。針對有這個需求的新業務或者之前處理鏈路性價比不高的老業務,可以考慮慢慢切換到這條新鏈路嘗試使用。




























