搞懂 Spring Batch
在日常開發過程中,我們或多或少都會涉及到數據報表、統計分析、定時任務之類的應用場景。針對這些場景,我們可以采用 Hadoop 生態圈中的相關技術。
但是 Hadoop 是一種重量級的實現方案,實際應用過程中存在入門門檻過高、學習周期過長、開發和維護困難等問題,對于某些體量并不是特別大的應用場景而言并不建議使用。相反,我們希望找到一種輕量級實現方案來支持日常批處理功能,這就是今天我們要討論的話題。

圖 1 批處理需求和實現方案
那么,如何實現輕量級的批處理呢?讓我們先從相關設計理念開始講起。
輕量級批處理基本架構
在考慮批處理架構之前,我們站在最高的抽象度上,可以把批處理過程看作是一個流程,包括讀數據、處理數據和寫數據,而這些數據背后是各種數據存儲媒介。

圖 2 批處理流程的抽象
批處理架構的抽象過程
和普通應用程序一樣,對于如何實現上述流程,我們第一個需要考慮的設計問題是如何確定所需要實現組件之間的職責和功能,這就需要引入分層思想。
分層結構上,批處理架構可以抽象為三個主要層次,基礎架構層、核心處理層和應用開發層。
基礎架構層提供了通用的讀、寫、處理服務,是對各種數據媒介的操作封裝;核心處理層關注于批處理的執行過程,包括對批處理任務和流程的抽象以及如何啟動、控制這些任務與流程;應用開發層則包含應用程序需要實現的業務代碼。

圖 3 批處理技術的分層架構
有了分層架構之后,我們接下來對批處理的處理對象進行建模,從而引出任務(Job)的概念。Job 就是批處理的基本對象,每個 Job 可以包含一個或多個步驟(Step),每個 Step 負責與具體的外部媒介交互,并產生計算結果。
我們知道,對于批處理應用而言,處理的對象并不是一條數據,而是一批數據的集合(Batch)。因此,在讀取數據階段,讀取器(Reader)可以單條執行讀取操作,并交由數據處理器(Processor)進行轉換或過濾處理,但在寫數據的過程中,數據寫入器(Writer)往往會以一批數據為基本操作單元。

圖 4 批處理時序圖
批處理的健壯性
在上面這個時序圖中,每一步都可能出現問題。因此,我們需要在出現問題時仍然能夠確保批處理流程執行完畢,這就需要引入健壯性(Robustness)的概念。我們可以把批處理的健壯性簡單理解為是一種智能化機制,即在長時間不需要開發人員或業務人員干預的情況下仍然可以自動處理各種異常情況。
那么,如何實現健壯性呢?結合批處理的處理特性,我們可以梳理健壯性的不同實現策略,常見的保護三種,即忽略、重試和重啟。

圖 5 批處理健壯性的三種策略
忽略的含義在于,對于那些并不影響批處理執行流程的異常情況,我們沒有必要停止整個任務,而是可以選擇性地忽略這些異常。場景可以忽略的異常包括數字格式錯誤等。
有時候,導致任務執行出現異常的原因并不是數據或代碼有問題,而是那些瞬態異常,常見的包括網絡訪問失敗或數據庫鎖等。針對這些瞬態異常,我們可以采取帶有重試次數限制的重試策略。
與前面兩種情況不同,有時候因為業務處理異常同樣會導致批處理執行失敗。顯然這時候采用忽略或重試策略是解決不了問題的。我們需要暫停任務,然后修復代碼問題之后再重新執行任務,這就是重啟策略。
在一個成熟的批處理基本架構中,開發人員可以綜合使用這三種健壯性處理策略。而這三種策略的采用時機也是可以動態調整的,典型的例子包含:剛開始出現異常情況時,我們可以采用重試機制,但當重試出現三次之后如果仍然拋出異常,那么我們就需要轉為采用重啟策略了。
輕量級批處理框架:Spring Batch
介紹完輕量級批處理的基本架構之后,我們來討論它的實現工具。業界有許多輕量級批處理框架,今天我主要給你介紹的是,在 Spring 家族中專門針對輕量級批處理技術提供的相應解決方案,這就是 Spring Batch。
Spring Batch 基于 Spring 和 Java,實現了批處理的基本架構,并支持批處理健壯性。Spring Batch 內置包括文件、數據庫、消息中間件、外部服務在內的多種數據讀取和寫入機制,也對數據處理過程做了轉換和過濾抽象。
針對使用場景,Spring Batch 也給提供了系統化的支持。使用 Spring Batch 可以應用于定期提交批處理任務、按順序處理依賴的任務、部分處理、批處理事務支持以及消息傳遞等基礎設施集成等場景。
Spring Batch 的設計理念之一在于以接口形式暴露通用核心的服務并提供了完整的默認實現。Spring Batch 的核心接口如下,分別對應批處理的三個主要步驟。可以看到讀和處理操作的對象是一個 Item,而寫操作則使用 Item 列表。這點與我們前面的分析完全一致。代碼 1。
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}其中,ItemReader 和 ItemWriter 分別實現數據讀取和數據寫入,對象可以包括文本文件、XML 文件、數據庫、服務和 JMS 等多種形式。
然后,ItemProcessor 代表處理器模型,Spring Batch 中的數據處理有轉換(Transformation)和過濾(Filtering)兩種主要的場景。轉換的形式有多種,基本的數據狀態和數據結構轉換比較常見。而過濾的目的是決定是否進行 Writer 操作。無論是轉換還是過濾,Spring Batch 都為開發人員提供了擴展接口,我們可以基于業務邏輯實現自定義的復雜機制。
針對批處理的健壯性,Spring Batch 也同時支持 Skip、Retry 和 Restart 這三種策略。為了實現這三種策略,Spring Batch 對 Job 進行了進一步抽象。對于任何一個 Job,運行過程中都存在一種一對多的關系,即 Job 的定義應該只有一份,但可以有多次執行。因此,Spring Batch 中針對每個 Job 會生成一個 Job Instance,然后每次 Job 執行對應一個 Job Execution。這樣,健壯性策略在過程中會根據 Job Instance 生成一個新的 Job Execution 并放在 Job Repository 中。

圖 6 Spring Batch 中的 Job Instance 和 Execution
上圖中的 Job Repository 保存批處理運行時詳細信息,Spring Batch 支持 In-memory 和 JDBC 兩種持久化實現策略。
總結來說,站在最高的抽象層次上,所有批處理的過程都包括讀數據、處理數據和寫數據三大部分。雖然,普通的數據處理技術也可以實現這三個步驟,但一些關鍵特性使得批處理與這些數據數據技術有本質性區別。批處理面向海量數據,要求在實現自動化的前提下還需要保證處理過程的健壯性、可靠性和性能。Spring Batch 作為一款優秀的批處理開源框架,為開發人員提供了輕量級批處理的一整套解決方案。
總結
我們系統分析了輕量級批處理技術的方方面面。我們看到作為一個常見的技術體系,想要實現批處理,開發人員需要考慮的維度非常多。
我們首先站在架構設計的角度,對批處理執行過程進行了抽象,并給出了分層架構。在分層架構的基礎上,我們就引出了批處理的健壯性需求,并同樣闡述了三種實現方案。基于這些討論的設計思想,業界也存在一些輕量級批處理框架,比如 EasyBatch。今天的內容我們關注 Spring 家族的 Spring Batch 框架,可以看到該框架的實現過程和我們的設計思想是高度一致的。
































