Spring Batch(八):mongo同步-mongo读写组件使用

2021/04/13

以下文章来源于masonlee32

正文


文章目录

1.引言

之前对Spring Batch的通过实例的方式进行了介绍,除了文件及关系型数据库的数据同步, Spring Batch的读组件(ItemReader),处理组件(ItemProcessor),写组件(ItemWriter)支持丰富的数据类型, 其中MongoItemReaderMongoItemWriter是针对mongo的读写组件,用户可以直接使用,进行Mongodb的数据读写操作。 一种比较常用的情景是从关系型数据库(如mysql)把数据同步到mongodb中, 下面通过实例对mysqlmongodb的数据同步进行讲解。本文主要讲解有关Mongodb的操作, 对于Spring Batch使用beetlsql进行关系数据库数据读取的操作请见文章《Spring Batch(五):结合beetlSql进行数据读写 》。本文的示例代码见github示例仓库

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6
  • MySQL: 5.6.26
  • Mongodb:4.0.10

3.开发流程

3.1 示例数据库及目标数据库

本示例的流程如下所示:

流程

示例工程中的sql目录有相应的关系数据库脚本,mytest.sql脚本创建一个test_user表,并有相应的测试数据。 mongodb的安装可见官方文档 ,建立相应的存放数据的Collection,本示例为mytest

3.2 添加maven依赖及配置mongodb连接地址

由于需要使用mongodb的操作,因此需要添加它的依赖。如下所示:

<dependency>
    <groupId>org.springframework.boot</groupId>
</dependency>

添加依赖后,mongodb的连接地址需配置在配置文件中,若有用户名密码,则同样需要配置。如下:

 spring.data.mongodb.uri=mongodb://192.168.222.10/mytest
# spring.data.mongodb.username=
# spring.data.mongodb.password=

3.3 编写mongodb的读写组件

按示例,共三个组件,需要的是一个读mysql数据库的组件,一个mysql数据库实体转化为mongodb的处理组件, 一个写入mongodb的写组件,代码结构如下图所示:

其中ItemReader组件和ItemProcessor组件无须多讲,可参考之前的文章,这里主要讲一下mongodbItemWriter, 此写入组件通过继承MongoItemWriter,编写自己的逻辑即可,而Spring Batch提供的mongodb写操作, 是在初始化ItemWriter时,通过MongoOperations引入的,因此,MongoBatchConfig文件中,添加以下代码:

@Bean
public ItemWriter mongoWriter(MongoOperations mongoTemplate) {
    UserItemWriter userItemWriter = new UserItemWriter();
    userItemWriter.setTemplate(mongoTemplate);
    userItemWriter.setCollection("user");
    return userItemWriter;
}

其中,MongoOperations是在初始化时注入,在自定义的UserItemWriter中,设置templatecollection即可。 若逻辑简单,不写自定义的ItemWriter,也可以直接使用MongoItemWriterBuilder,直接构建MongoItemWriter, 如下所示:

return new MongoItemWriterBuilder<MongoUser>()
                .collection("user")
                .template(mongoTemplate)
                .build(); 以上是写组件的构建,同理,对于`mongodb`的读组件,构建方式类似,只是需要注意一下动态参数的配置,如下示例代码是查询数据, 并返回`map`,参数是在构建任务时动态传入的。

@Bean
@StepScope
public MongoItemReader<Map> tweetsItemReader(MongoOperations mongoTemplate,@Value("#{jobParameters['hashTag']}") String hashtag) {
    return new MongoItemReaderBuilder<Map>()
            .name("tweetsItemReader")
            .targetType(Map.class)
            .jsonQuery("{ \"entities.hashtags.text\": { $eq: ?0 }}")
            .collection("tweets_collection")
            .parameterValues(Collections.singletonList(hashtag))
            .pageSize(10)
            .sorts(Collections.singletonMap("created_at", Sort.Direction.ASC))
            .template(mongoTemplate)
            .build();
}

4.执行结果

编写单元测试或者在Controller编写启动任务,即可进行数据同步测试,执行结果如下所示:

5.总结

本文基于Spring Batch对数据从mysqlmongodb进行数据同步,通过结合示例代码, 实现mongodb的读写组件进行编写及配置,希望需要使用Spring Batch进行关系数据库和mongodb 进行批处理任务开发的人员有帮助。

Post Directory