Spring Batch(三):快速使用组件读文件数据到数据库
以下文章来源于masonlee32
正文
文章目录
- 快速使用组件-spring batch(3)读文件数据到数据库
- 1.引言
- 2.开发环境
- 3.Spring Batch提供的读-处理-写组件一览
- 3.2 ItemWriter
- 3.3 ItemProcessor
- 4.开发流程
- 4.1.2 数据库表说明
- 4.1.3 创建示例目标数据库
- 4.2 配置多数据源
- 4.2.2 配置多数据源访问
- 4.3 添加User实体
- 4.4 添加文件读取组件ItemReader
- 4.5 添加处理组件ItemProcessor
- 4.6 添加数据库写入组件ItemWriter
- 4.7 组装完整任务
- 4.8 编写测试
- 5.总结
- 参考资源
1.引言
上一篇文章《Spring Batch(二):快速了解组件之helloworld》
对Spring Batch
进行了入门级的开发,也对基本的组件有了一定的了解。但实际开发过程中,更多的是涉及文件及数据库的操作,以定时后台运行的方式,实现批处理操作。
典型操作是从文本数据(csv/txt
等文件)中读取数据,然后写入到数据库存储。如下图所示:
若需要开发此过程,可以按照上一篇文章所写的,自定义ItemReader
和ItemWriter
来实现,
但是Spring Batch
其实已经提供现成的文件读取和数据库写入的组件,开发人员可以直接使用,提高开发效率。
本文将会对文件读取和数据库写入进行实战介绍。
2.开发环境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 开发IDE: IDEA
- 构建工具Maven: 3.3.9
- 日志组件logback:1.2.3
- lombok:1.18.6
3.Spring Batch提供的读-处理-写组件一览
在使用Spring Batch
内置的读写组件时,首先我们先弄清楚有哪些组件可以用,按读、写、处理,见下面说明。
Spring Batch
已提供了比较全面的支持。
3.1 ItemReader
ItemReader | 说明 |
---|---|
ListItemReader | 读取List类型数据,只能读一次 |
ItemReaderAdapter | ItemReader适配器,可以复用现有的读操作 |
FlatFileItemReader | 读Flat类型文件 |
StaxEventItemReader | 读XML类型文件 |
JdbcCursorItemReader | 基于JDBC游标方式读数据库 |
HibernateCursorItemReader | 基于Hibernate游标方式读数据库 |
StoredProcedureItemReader | 基于存储过程读数据库 |
JpaPagingItemReader | 基于Jpa方式分页读数据库 |
JdbcPagingItemReader | 基于JDBC方式分页读数据库 |
HibernatePagingItemReader | 基于Hibernate方式分页读取数据库 |
JmsItemReader | 读取JMS队列 |
IteratorItemReader | 迭代方式的读组件 |
MultiResourceItemReader | 多文件读组件 |
MongoItemReader | 基于分布式文件存储的数据库 MongoDB读组件 |
Neo4jItemReader | 面向网络的数据库Neo4j的读组件 |
ResourcesItemReader | 基于批量资源的读组件,每次读取返回资源对象 AmqpItemReader读取AMQP队列组件 |
RepositoryItemReader | 基于 Spring Data的读组件 |
3.2 ItemWriter
| ItemWriter | 说明 | |—————————————-|———————————————————–| | FlatFileItemWriter | 写Flat类型文件 | | MultiResourceItemWriter | 多文件写组件 | | StaxEventItemWriter | 写XML类型文件 | | AmqpItemWriter | 写AMQP类型消息 | | ClassifierCompositeItemWriter | 根据 Classifier路由不同的Item到特定的ItemWriter处理 | | HiberateItemWriter | 基于Hibernate方式写数据库 | | ItemWriterAdapter | ItemWriter适配器,可以复用现有的写服务 | | JdbcBatchItemWriter | 基于JDBC方式写数据库 | | JmsItemWriter | 写JMS队列 JpaItemWriter基于Jpa方式写数据库 | | GemfireItemWriter | 基于分布式数据库Gemfire的写组件 | | SpELMappingGemfireItemWriter | 基于Spring表达式语言写分布式数据库Gemfire的写组件 | | MimeMessageItemWriter | 发送邮件的写组件 | | MongoItemWriter | 基于分布式文件存储的数据库MongoDB写组件 | | Neo4jItemWriter | 面向网络的数据库Neo4j的读组件 | | PropertyExtractingDelegatingItemWriter | 属性抽取代理写组件:通过调用给定的 Spring Bean方法执行写入,参数由Item中指定的属性字段获取作为参数 | | RepositoryItemWriter基于 | Spring Data的写组件 | | SimpleMailMessageItemWriter | 发送邮件的写组件 | | CompositeItemWriter | 条目写的组合模式,支持组装多个ItemWriter |
3.3 ItemProcessor
ItemProcessor | 说明 |
---|---|
CompositeItemProcessor | 组合处理器,可以封装多个业务处理服务 |
ItemProcessorAdapter | ItemProcessor适配器,可以复用现有的业务处理服务 |
PassThroughItemProcessor | 不做任何业务处理,直接返回读到的数据 |
ValidatingItemProcessor | 数据校验处理器,支持对数据的校验,如果校验不通过可以进行过滤掉或者通过skip的方式跳过对记录的处理 |
4.开发流程
根据当前示例,从csv
文件中读数据,写入到mysql
数据库,只需要使用FlatFileItemReader
和JdbcBatchItemWriter
即可。
下面对开发流程作简要说明。示例工程可以在这里获取
,
里面有文件resources/user-data.csv
及相应的目标数据库脚本mytest.sql
。
4.1 创建spring batch数据库
4.1.1 创建数据库并执行sql脚本
Spring Batch
的运行需要数据库的支持,以保存任务的运行状态及结果。因此需要先创建数据库。
在mysql
中创建名为my_spring_batch
的数据库。并在此数据库中执行
Spring Batch
的数据库脚本,脚本位置在spring-batch-core-4.1.2.RELEASE.jar
的jar包
中的\org\springframework\batch\core\schema-mysql.sql
也可以在 代码示例
的sql
文件夹中获取。
执行完成后,数据库表如下图所示:
4.1.2 数据库表说明
数据库共9张表,以seq
结尾的是用于生成主键的。其它6张表,以batch_job
开头的是存储任务的相关信息,
batch_step
开头的存储步骤相关信息。
-
job
与job instance
与job execution
关系 任务job
是我们说的逻辑概念,即完整的一个批处理工作,它的实例就是job instance
, 此任务信息是存储在batch_job_instance
表中。有点类似java中的类和类实例的概念,是一对多的关系。 对于每一个job instance
,执行的时候会生成记录存储在batch_job_execution
中,表示每一个job
执行的实际情况。 注意,这里job instance
和job execution
也是一对多的关系,即同一个实例有可能会执行多次(如上一次执行失败了, 后面重新再执行)。 -
batch_job_execution_context
及batch_job_execution_params
存储任务执行时需要用到的上下文(以json
格式存储)及运行时使用的参数。 -
batch_step_execution
及batch_step_execution_context
存储任务执行过程中的作业步骤及运行时上下文。
4.1.3 创建示例目标数据库
本示例只涉及一个test_user
表。创建mytest
数据库库,执行mytest.sql
脚本即可。
4.2 配置多数据源
一般来说,我们会把Spring Batch
的数据存储在独立的数据库中,而实际的应用使用的则是目标数据库,因此需要配置多数据源访问。
基于上一篇文章
的工程进行开发。
4.2.1 添加mysql数据库依赖
<dependency>
<groupId>org.springframework.boot</groupId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<scope>runtime</scope>
</dependency>
4.2.2 配置多数据源访问
Spring Boot
对多数据源的支持比较友好,配置也很简单,先在配置文件中添加数据库配置,
然后在java配置文件中添加相应的注解即可。如下:
-
application.properties
配置内容# spring batch db spring.datasource.jdbc-url=jdbc:mysql://localhost:3310/my_spring_batch?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false spring.datasource.username=root spring.datasource.password=111111 # target db spring.target-datasource.jdbc-url=jdbc:mysql://localhost:3310/mytest?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false spring.target-datasource.username=root spring.target-datasource.password=111111
-
DataSourceConfig
配置内容 新建DataSourceConfig.java
文件,配置多数据源,如下:@Configuration public class DataSourceConfig { @Bean("datasource") @ConfigurationProperties(prefix="spring.datasource") @Primary public DataSource batchDatasource() { return DataSourceBuilder.create().build(); } @Bean("targetDatasource") @ConfigurationProperties(prefix="spring.target-datasource") public DataSource targetDatasource() { return DataSourceBuilder.create().build(); } }
这样,后面就可以直接使用datasource
及targetDatasource
两个Bean进行数据库访问。
4.3 添加User实体
本实例中,读取csv
文件,转为User
实体,然后存储到数据库,因此需要先把User
这个实体作一个定义。
使用了lombok
和jpa
的注解,如下:
@Entity
@Data
@Table(name = "test_user")
public class User {
@Id
@GeneratedValue
/**
* id
*/
private Long id;
/**
* 姓名
*/
private String name;
/**
* 手机号
*/
private String phone;
//...略
}
4.4 添加文件读取组件ItemReader
使用内置的FlatFileItemReader
即可。如下:
@Bean
public ItemReader file2DbItemReader(){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return new FlatFileItemReaderBuilder<User>()
.name(funcName)
.resource(new ClassPathResource("user-data.csv"))
// .linesToSkip(1)
.delimited()
.names(new String[]{"id","name","phone","title","email","gender","date_of_birth","sys_create_time","sys_create_user","sys_update_time","sys_update_user"})
.fieldSetMapper(new UserFieldSetMapper())
.build();
}
说明:
FlatFileItemReaderBuilder
用于创建FlatFileItemReader
,设置相应的行为, 包括使用它来设置读取文件的位置(resource
),文件分隔符(默认是','
),是否跳过前面几行(linesToSkip
), 标识每一列对应的列名称(可与数据库的字段名一致)。设置文件字段与数据库实体字段的对应关系。-
设置文件字段与数据库实体字段的对应关系,使用
FieldSetMapper
来实现,其中FieldSet
代表每一行文本数据, 返回值即为实体对象。如下所示:public class UserFieldSetMapper implements FieldSetMapper
{ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { String patternYmd = "yyyy-MM-dd"; String patternYmdHms = "yyyy-MM-dd HH:mm:ss"; User user = new User(); user.setId(fieldSet.readLong("id")); user.setName(fieldSet.readString("name")); user.setPhone(fieldSet.readString("phone")); user.setTitle(fieldSet.readString("title")); user.setEmail(fieldSet.readString("email")); user.setGender(fieldSet.readString("gender")); //此字段有可能为null String dataOfBirthStr = fieldSet.readString("date_of_birth"); if(SyncConstants.STR_CSV_NULL.equals(dataOfBirthStr)){ user.setDateOfBirth(null); }else{ DateTime dateTime = DateUtil.parse(dataOfBirthStr, patternYmd); user.setDateOfBirth(dateTime.toJdkDate()); } user.setSysCreateTime(fieldSet.readDate("sys_create_time",patternYmdHms)); user.setSysCreateUser(fieldSet.readString("sys_create_user")); user.setSysUpdateTime(fieldSet.readDate("sys_update_time",patternYmdHms)); user.setSysUpdateUser(fieldSet.readString("sys_update_user")); return user; } }
4.5 添加处理组件ItemProcessor
由于csv
文本文件中的数据null
值数据标识符为\N
,因此可以在处理组件中进行处理,把标识符\N
设置为null
值。
如下所示:
@Slf4j
public class File2DbItemProcessor implements ItemProcessor<User,User> {
@Override
public User process(User user) throws Exception {
user.setPhone(checkStr(user.getPhone()));
user.setTitle(checkStr(user.getTitle()));
user.setEmail(checkStr(user.getEmail()));
user.setGender(checkStr(user.getGender()));
log.info(LogConstants.LOG_TAG + "item process: " +user.getName());
return user;
}
public String checkStr(String dataToCheck){
if(SyncConstants.STR_CSV_NULL.equals(dataToCheck)){
return null;
}
return dataToCheck;
}
}
4.6 添加数据库写入组件ItemWriter
数据库写入组件使用JdbcBatchItemWriter
即可,如下:
@Bean
public ItemWriter file2DbWriter(@Qualifier("targetDatasource") DataSource datasource){
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user) " +
"VALUES (:id,:name,:phone,:title,:email,:gender,:dateOfBirth,:sysCreateTime,:sysCreateUser,:sysUpdateTime,:sysUpdateUser)")
.dataSource(datasource)
.build();
}
说明:
- 使用
JdbcBatchItemWriterBuilder
进行JdbcBatchItemWriter
的创建,设置插入数据库的sql语句, 同时指定数据源即可。 @Qualifier("targetDatasource") DataSource datasource
用于指定数据源- 使用
BeanPropertyItemSqlParameterSourceProvider
可以直接把读取的数据实体的属性数据作为参数填充到sql
语句中, 从而实现数据插入操作。
4.7 组装完整任务
经过上面的操作,可以使用一个java配置,把读、写、处理组装成完整的step
和job
,如下所示(详细可见示例工程文件):
File2DbBatchConfig.java
@Bean
public Job file2DbJob(Step file2DbStep,JobExecutionListener file2DbListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName)
.listener(file2DbListener)
.flow(file2DbStep)
.end().build();
}
@Bean
public Step file2DbStep(ItemReader file2DbItemReader , ItemProcessor file2DbProcessor
,ItemWriter file2DbWriter){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName)
.<User,User>chunk(10)
.reader(file2DbItemReader)
.processor(file2DbProcessor)
.writer(file2DbWriter)
.build();
}
4.8 编写测试
参考上一文章的ConsoleJobTest
,编写File2DbJobTest
文件。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,File2DbBatchConfig.class})
@Slf4j
public class File2DbJobTest {
@Autowired
private JobLauncherService jobLauncherService;
@Autowired
private Job file2DbJob;
@Test
public void testFile2DbJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//构建任务运行参数
JobParameters jobParameters = JobUtil.makeJobParameters();
//执行并显示结果
Map<String, Object> stringObjectMap = jobLauncherService.startJob(file2DbJob, jobParameters);
Assert.assertEquals(ExitStatus.COMPLETED,stringObjectMap.get(SyncConstants.STR_RETURN_EXITSTATUS));
}
}
执行后结果输出如下(exitCode=COMPLETED
):
5.总结
本文先对Spring Batch
的开箱即用的ItemReader
,ItemWriter
、ItemProcessor
作了一个简要的概览,
然后以读csv
文件,处理null值,再插入到数据库的处理逻辑为案例,介绍了Spring Batch
的数据库脚本,
FlatFileItemReader
及JdbcBatchItemWriter
的使用。希望对大家更深入的了解Spring Batch
有帮助,
并能用到实践中。
参考资源
-
刘相《Spring Batch 批处理框架》:书中对Spring Batch进行了详细的描述,本文章主要参考此书。
-
《Spring Batch - Reference Documentation》:书中对Spring Batch进行了详细的描述,本文章主要参考此书。