基于SPring Batch数据迁移处理系统的设计与实现
2016-11-19金石声李珏
金石声 李珏
[摘要]本省自动气象观测系统根据观测要素的不同分为单雨量系统、两要素系统和六要素系统。三类观测系统分别接收各自站点采集的报文进行解码入库,同时通过FTP将数据报文发送到综合处理系统再次进行解码入库。这样的处理流程不仅增加网络负担,还进行了重复的解码工作,资源利用率低。为此本文采用基于Spring Batch的框架,定时对三个不同观测系统的后台数据库的实时数据向综合处理系统的数据库进行迁移。该流程采用数据分区技术,多线程并行处理,在减少了对网络带宽的占用量的同时,降低了各个系统CPU使用量,提高了工作效率。
[关键词]Spring Batch 数据迁移 数据分区 多线程
引言
本省的气象自动站综合系统需要对已经进行过解码入库的单雨量、两要素和六要素观测报文进行集中,再次进行解码后插入综合数据库,并以此对外提供服务。这样的处理流程不仅在资源利用、网络带宽的占用都产生了很大的浪费。
通过分别读取单雨量、两要素和六要素的数据库,对已经入库的数据进行准实时的迁移,在降低网络带宽占用的同时,也能降低综合处理系统的资源占用。但是基于大量的数据读写,且重复性高工作,传统的编程方式处理不仅繁琐,且维护性低。已经Spring Batch的出现无疑是解决这种问题的一种有效工具。由于目前已经有比较成熟的开源框架支持批处理的需求,所以本方案拟选用开源框架Spring Batch。这样可以借助开源框架比较成熟的代码,减少研究的难度,加强框架扩展性,减少研发周期,加快实际应用进度,并且保证程序的稳定性。
一、Spring Batch简介
Spring batch是Spring的一个子项目,由Spring Source与Accenture(埃森哲)合作开发的批处理框架。Springhatch对编写批处理程序本身的特性进行了抽象。将批处理程序分为Job和Job Step两个部分,将处理环节定义为数据读、数据处理和数据写三个步骤。提供Job Repository来存储Job执行期间的元数据,可以在处理大量的数据时,提供日志记录/跟踪,事务管理,处理统计,资源管理等特性。此外,还提供了分区技术采用多线程方式并行处理作业。
二、系统总体架构设计
考虑到系统可能出现的单店故障,为减少处理这类故障的开发难度。开发三个独立的程序分别对单雨量、两要素和六要素三个数据库中的数据进行迁移。本文以迁移两要素数据为例进行介绍。
Spring bath的核心思想是将读取到的数据转化为Java对象,然后对对象进行操作。首先需要根据表中的字段建立相应的实体类,然后Spring Batch把从源数据库中读取的每条数据映射为对应的Java对象,由于本文只是对数据进行迁移,不需要对对象进行处理,所以将Java对象的值通过写步骤写入目标数据库。实际应用中只关注两张表中存储的数据:小时数据表(tabHourData)和分钟数据表(tabMinuteData),由于迁移这两张表没有具体的先后顺序,将这两部分工作并行处理。
Spring batch提供了XML方式进行业务流程配置,通过spht元素来提供并行作业流的定义,通过task-execution属性来定义执行的线程池,从而提高Job的执行效率。其中要定义两个不同的作业步(transferHottrTab_step和transferMinuteTab_step),每个作业步下定义了两个具体的子Job分别来完成对两张数据表的数据迁移。子Job中又分别定义了读、写两个过程来完成数据的迁移。其中为了保证执行效率定义了commit-interval,指定了从数据库读入1000条数据后进行一次写操作,这样既减少了10的访问,也提高了写入效率。关键配置如下(以transferHottrTab_Job为例):
数据迁移中读取的数据量较大,为了高时效的完成读取作业,读取数据的任务进行分区,每个分区交给不同的线程处理。该模式的优点在于分区中每一个元素的处理都能像一个普通Spring Batch任务的单步一样运行。具体关系图如3-1,将需要读取的目标数据分为了3个分区,每一个分区都有一个执行上下文Execution Context,StepExecutionSplitter根据不同的上下文生成作业步执行器,然后交给PartitionHandler来处理。应为Spring Batch默认实现了StepExecutionSplitter以及PartitionHandler。开发过程中原则上只需要实现自己分区逻辑partitioner即可。
小时表和分钟表都含有对数据观测时间的字段,因此对该字段进行分区,可以实现分区策略的共享。具体配置如下:
class=”com.xxzx.partition.DBpartition”>
作业中用于对数据库进行分区的DBpartition了实现了Spring Btach的Partitioner接口,定义具体的分区策略,将数据查询的时间进行切片,然后写入Step的执行上下文,关键代码如下:
while(start<=max){
ExecutionContext context=new ExecutionContext();
if(end>=max){
end=max;
}
context.putInt(_STARTTIME,start);
context.putInt(_ENDTIME,end);
start+=targetSize;
end+=targetSize;
resultMap.put(“partition”+(number++),context);
通过把每次任务中需要查询的时间段根据targetSize的值进行切片,意味着数据片段分配到不同的作业步中。并将_ENDTIME和_STARTYIME写入Step的执行上下文(ExecutionContext)。然后在读取数据的阶段通过读取Step执行上下文获取每个片段的统计时段。
3.2定时调度
由于Spring Batch本身并不是一个定时的调度框架。本文采用Spring本身提供的一个轻量级的调度框架SpringScheduler来实现定时调度任务。关键配置如下:
其中采用cron表达式实现每5分钟调度一次作业,并且在schedulerLauncher中完成对启动具体作业的配置。这样便将Spring Scheduler和Spring Batch结合起来完成数据迁移任务。
四、结论
本文基于Spring Batch框架采用数据分区技术、多线程并行处理的方法开发了一个数据库迁移系统完成批量数据的迁移工作,并结合Spring Scheduler实现了批处理任务的定时调度。在实际工作中优化了本省自动站数据处理系统中带宽占用率高、系统资源浪费的现状。实际工作中对本省自动站数量两要素自动站进行了测试,完成3260条数据的迁移的时间不超过30秒。完全满足实际业务中的需求。