Spring Batch 批處理全解析:從基礎(chǔ)到實(shí)戰(zhàn)案例
前言
在企業(yè)級(jí)應(yīng)用中,批處理是常見(jiàn)的需求場(chǎng)景,比如數(shù)據(jù)遷移、報(bào)表生成、日志分析等。這些任務(wù)往往需要處理大量數(shù)據(jù),且對(duì)執(zhí)行效率、容錯(cuò)能力有較高要求。Spring Batch作為Spring生態(tài)中專注于批處理的框架,憑借其靈活的架構(gòu)和強(qiáng)大的功能,成為批處理領(lǐng)域的主流選擇。
核心概念
Spring Batch的架構(gòu)設(shè)計(jì)遵循分層思想,核心組件可分為應(yīng)用層、核心層和基礎(chǔ)設(shè)施層,理解這些組件是使用框架的基礎(chǔ)。
核心組件
Job(作業(yè)):批處理任務(wù)的頂層抽象,代表一個(gè)完整的批處理流程。一個(gè)Job由一個(gè)或多個(gè)Step組成,例如用戶數(shù)據(jù)同步Job可能包含讀取文件Step和寫(xiě)入數(shù)據(jù)庫(kù)Step。Step(步驟):Job的最小執(zhí)行單元,是批處理的核心執(zhí)行單元。每個(gè)Step包含三大核心組件:
ItemReader:數(shù)據(jù)讀取組件,負(fù)責(zé)從數(shù)據(jù)源(文件、數(shù)據(jù)庫(kù)、消息隊(duì)列等)讀取數(shù)據(jù),例如從CSV文件讀取用戶信息。ItemProcessor:數(shù)據(jù)處理組件,用于數(shù)據(jù)轉(zhuǎn)換、清洗或過(guò)濾,例如對(duì)讀取的用戶年齡進(jìn)行校驗(yàn),過(guò)濾掉無(wú)效數(shù)據(jù)。ItemWriter:數(shù)據(jù)寫(xiě)入組件,負(fù)責(zé)將處理后的數(shù)據(jù)寫(xiě)入目標(biāo)存儲(chǔ)(數(shù)據(jù)庫(kù)、文件等)。
JobLauncher:Job的啟動(dòng)器,負(fù)責(zé)觸發(fā)Job的執(zhí)行,可通過(guò)代碼或外部調(diào)度工具(如Quartz)調(diào)用。JobRepository:Job的元數(shù)據(jù)倉(cāng)庫(kù),用于存儲(chǔ)Job的執(zhí)行狀態(tài)(如啟動(dòng)時(shí)間、結(jié)束時(shí)間、執(zhí)行結(jié)果),默認(rèn)使用數(shù)據(jù)庫(kù)存儲(chǔ)(支持主流數(shù)據(jù)庫(kù))。
批處理特性
Spring Batch 的核心優(yōu)勢(shì)體現(xiàn)在批處理的關(guān)鍵能力上:
- 容錯(cuò)機(jī)制:支持跳過(guò)錯(cuò)誤數(shù)據(jù)、重試失敗操作,避免單個(gè)數(shù)據(jù)錯(cuò)誤導(dǎo)致整個(gè)任務(wù)失敗。
- 事務(wù)管理:基于
Spring事務(wù)機(jī)制,保證Step執(zhí)行的原子性(要么全部成功,要么回滾)。 - 并行處理:支持多線程、分區(qū)等并行策略,提升大數(shù)據(jù)量處理效率。
- 可擴(kuò)展性:核心組件均可自定義實(shí)現(xiàn),適配復(fù)雜業(yè)務(wù)場(chǎng)景。
代碼示例
從CSV文件讀取用戶數(shù)據(jù),清洗后寫(xiě)入MySQL數(shù)據(jù)庫(kù),演示Spring Batch的完整使用流程。
示例數(shù)據(jù)
1,張三,25,zhangsan@example.com
2,李四,17,lisi@example.com
3,王五,30,wangwu@example.com- 處理規(guī)則:過(guò)濾年齡小于
18歲的用戶(視為無(wú)效數(shù)據(jù)),并將郵箱統(tǒng)一轉(zhuǎn)換為小寫(xiě)。 - 目標(biāo)存儲(chǔ):
MySQL數(shù)據(jù)庫(kù)(用戶表,字段:id、name、age、email)。
數(shù)據(jù)庫(kù)配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/batch_demo?useSSL=false&serverTimezone=UTC
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update # 自動(dòng)創(chuàng)建表結(jié)構(gòu)
batch:
jdbc:
initialize-schema: always # 自動(dòng)初始化JobRepository所需表核心代碼實(shí)現(xiàn)
定義數(shù)據(jù)模型
@Entity
@Table(name = "user")
public class User {
@Id
private Long id;
private String name;
private Integer age;
private String email;
// 構(gòu)造函數(shù)、getter、setter省略
}實(shí)現(xiàn) ItemReader(讀取 CSV)
FlatFileItemReader讀取CSV文件,通過(guò)DefaultLineMapper解析行數(shù)據(jù):
@Bean
public FlatFileItemReader<User> csvItemReader() {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
// 設(shè)置CSV文件路徑(實(shí)際可通過(guò)配置注入)
reader.setResource(new ClassPathResource("users.csv"));
// 跳過(guò)表頭(若CSV無(wú)表頭可省略)
reader.setLinesToSkip(0);
// 解析行數(shù)據(jù):按逗號(hào)分隔,映射到User對(duì)象
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
// 分隔符配置(CSV用逗號(hào)分隔)
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "age", "email"); // 對(duì)應(yīng)CSV列名
// 字段映射到實(shí)體類
BeanWrapperFieldSetMapper<User> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(User.class);
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
reader.setLineMapper(lineMapper);
return reader;
}實(shí)現(xiàn) ItemProcessor(數(shù)據(jù)清洗)
自定義處理器,實(shí)現(xiàn)過(guò)濾和轉(zhuǎn)換邏輯:
@Bean
public ItemProcessor<User, User> userProcessor() {
return user -> {
// 過(guò)濾年齡<18的用戶(返回null表示過(guò)濾)
if (user.getAge() < 18) {
return null;
}
// 郵箱轉(zhuǎn)換為小寫(xiě)
user.setEmail(user.getEmail().toLowerCase());
return user;
};
}實(shí)現(xiàn) ItemWriter(寫(xiě)入數(shù)據(jù)庫(kù))
使用JpaItemWriter(基于JPA)將數(shù)據(jù)寫(xiě)入MySQL:
@Bean
public JpaItemWriter<User> jpaItemWriter(EntityManagerFactory entityManagerFactory) {
JpaItemWriter<User> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory); // 注入JPA實(shí)體管理器
return writer;
}如果使用Mybatis,可以自定義ItemWriter
@Component
public class MyBatisPlusItemWriter implements ItemWriter<User> {
@Autowired
private UserMapper userMapper;
@Override
public void write(List<? extends User> items) throws Exception {
// 使用MyBatis-Plus的批量插入方法
userMapper.insertBatchSomeColumn(items);
}
}定義 Step 和 Job
通過(guò)JobBuilderFactory和StepBuilderFactory構(gòu)建任務(wù)流程:
@Configuration
@EnableBatchProcessing // 開(kāi)啟批處理支持
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
// 定義Step(包含reader、processor、writer)
@Bean
public Step csvToDbStep() {
return stepBuilderFactory.get("csvToDbStep")
// 每處理10條數(shù)據(jù)提交一次事務(wù)(根據(jù)數(shù)據(jù)量調(diào)整)
.<User, User>chunk(10)
.reader(csvItemReader())
.processor(userProcessor())
.writer(jpaItemWriter(entityManagerFactory))
.build();
}
// 定義Job(包含一個(gè)Step)
@Bean
public Job userMigrationJob() {
return jobBuilderFactory.get("userMigrationJob")
.start(csvToDbStep()) // 啟動(dòng)Step
.build();
}
}啟動(dòng) Job
通過(guò)JobLauncher觸發(fā)任務(wù)執(zhí)行(可在啟動(dòng)類或控制器中調(diào)用):
@SpringBootApplication
public class BatchApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job userMigrationJob;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// 啟動(dòng)Job(可傳入?yún)?shù),如文件路徑)
JobParameters parameters = new JobParametersBuilder()
.addString("jobId", String.valueOf(System.currentTimeMillis())) // 唯一標(biāo)識(shí),避免重復(fù)執(zhí)行
.toJobParameters();
jobLauncher.run(userMigrationJob, parameters);
}
}進(jìn)階特性:容錯(cuò)與并行處理
容錯(cuò)機(jī)制
通過(guò)Step配置實(shí)現(xiàn)錯(cuò)誤處理:
@Bean
public Step faultTolerantStep() {
return stepBuilderFactory.get("faultTolerantStep")
.<User, User>chunk(10)
.reader(csvItemReader())
.processor(userProcessor())
.writer(jpaItemWriter(entityManagerFactory))
// 跳過(guò)年齡為空的異常數(shù)據(jù)(最多跳過(guò)10條)
.faultTolerant()
.skip(NullPointerException.class)
.skipLimit(10)
// 重試數(shù)據(jù)庫(kù)連接失敗的情況(最多重試3次)
.retry(SQLException.class)
.retryLimit(3)
.build();
}并行處理(多線程)
對(duì)大數(shù)據(jù)量任務(wù),可通過(guò)多線程提升效率(適用于無(wú)狀態(tài)任務(wù)):
@Bean
public Step parallelStep() {
return stepBuilderFactory.get("parallelStep")
.<User, User>chunk(100) // 增大批次大小
.reader(csvItemReader())
.processor(userProcessor())
.writer(jpaItemWriter(entityManagerFactory))
.taskExecutor(new SimpleAsyncTaskExecutor()) // 異步任務(wù)執(zhí)行器
.throttleLimit(5) // 最多5個(gè)線程并行
.build();
}

































