分布式信息系统中数据交换平台设计与实现
2012-07-25李艳春焦文彬
李艳春,李 新,焦文彬
(中国科学院 计算机网络信息中心,北京100190)
0 引 言
数据的交换、信息的共享是分布式信息系统[1]中必不可少的一部分。建立数据交换平台加快信息系统之间的数据流通实现信息的共享和集成,有利于提高分布式信息系统的性能。交换的业务信息以及数据不能丢失,必须确保交换可靠性。很多分布式信息系统为每一业务系统分别开发交换接口,或者在进行数据交换时必须保证同时连接到源数据库和目标数据库,这些方式影响到系统的可移植性与二次开发能力,出现重复开发造成浪费降低了工作效率,需要为分布信息系统建立一个跨地域、跨组织、跨应用的数据交换、数据共享、数据流转的数据交换平台。直接用厂商提供的交换工具不仅成本高而且较依赖于具体厂商提供的平台,不利于系统的改造和二次开发。为了解决以上的问 题,本 文 基 于 SOA[2](service-oriented architecture)平台,遵守JMS[3](Java message service)规范,采用开源消息中间件[4-5],灵活、低成本的实现了松耦合[6]、稳定的数据交换平台。
1 JMS规范及ActiveMQ
JMS[7]是用于和面向消息的中间件相互通信的应用程序接口。JMS支持两种基本的消息传递机制。第一种机制是点到点的消息传递 PTP (point-to-point messaging),生产者的消息只能发送给一个消费者,该消息模型与消息队列有关。生产者将消息发送到消息队列,消费者从这个消息队列中收到消息。在这个消息模型中,消息只有一个消费者,如果一个消费者接收了消息,其它的消费者就不能得到这条消息。这种模型中的生产者数量没有限制,消费者也可以有多个,但收到每条消息的消费者只能有一个。生产者发送消息后,消费者可以在任意的时刻接收。消费者收到消息后进行确认,如果没有发出确认,那么这条消息可以被其它消费者继续接收。
另一种机制是发布-订阅式的消息传递P/S(publishsubscribe messaging),生产者的消息可以发送给任意数量的消费者。该消息模型与主题有关。生产者发布消息,消费者订阅相应的消息,生产者将消息和主题目标连在一起,该消息模型中,生产者和消费者的数量都没有限制,可以有多个消费者收到同一条消息。消费者对主题目标的订阅支持长期订阅和持久订阅。长期订阅模式是消费者在非活动状态时生产者发送了消息,当消费者恢复到活动状态时,同样会成功收到该消息。如果消费者注册了持久订阅,主题目标会一直保留生产者发送的消息为该消费者接收。
JMS只是定义了信息传递的规范和接口,但JMS不能实现消息的传送功能。实现JMS接口的消息中间件称为JMS Provider,现在很多著名公司都提供了JMS中间件产品,如 Apache公司的 Active MQ[8],IBM 公司的 Web-Sphere MQ[9],SUN公司的Java Sun MQ,等。不同公司的产品由于开发的语言、连接数据库的能力、能否构成服务器簇以及实现高可用性等附加功能的不同,需要根据系统的不同应用场合、系统的安全性和性能指标的要求,适当选择不同的产品,使所开发的系统达到最佳的性能价格比。如IBM公司的WebSphere MQ性能稳定,但该产品价格昂贵,在我国的一般公司、企事业等单位的使用就受到一定的限制。
本文采用基于JMS Provider的开源ActiveMQ消息中间件实现数据交换平台。ActiveMQ是Apache支持的顶级开源消息中间件。ActiveMQ能够保证交换信息的高度安全、可靠,支持消息重发和记录日志,减少编写应用的复杂度。它的主要特性:
(1)ActiveMQ提供了诸如Java,C,C++,C#,Python,PHP等多种语言的程序接口,同时支持多种应用协议编写客户端,如OpenWire,Stomp REST,WSNotification,XMPP,AMQP。
(2)支持具有持久化,XA消息,事务等属性的JMS1.1和J2EE 1.4规范。可以自动的部署到符合J2EE 1.4规范的服务器上。
(3)ActiveMQ支持Spring最新版本的相关特性,同时可以与使用Spring开发的系统进行集成。
(5) 支 持 诸 如 in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA等多种网络传送协议。
(6)从设计上支持消息持久化。
(7)ActiveMQ能够保证集群的高性能特性。
(8)支持其它应用程序调用自身内嵌的JMS provider。
(9)可以运行在各种操作系统上,如:Linux,Unix,Windows。
2 数据交换平台设计
JMS提供的消息传递机制中在PTP模式下,如果两两节点之间连接需要配置队列数量为:n* (n-1),本文实现的交换平台需要在130个节点上应用,采用该模式需要配置的队列总量为:n* (n-1)=130* (130-1)=16670个,PTP模式下的配置工作量太大。所以本文设计的数据交换平台使用持久化的P/S模式,该平台能够可靠的交换数据,各节点易于部署维护。数据交换平台为用户提供了详细的数据流转时间及交换日志信息。
2.1 交换平台总体结构
数据交换平台采用星型结构,发送节点的业务系统通过数据交换接口委托交换中心客户端程序 (MQ Client)将数据交换至交换中心,交换中心将数据传输到目标节点的客户端,目标节点的数据交换接口将客户端接收到的数据存入该节点的业务系统中。采用星型拓扑结构,各节点只需和中心交换服务器互通即可,作为用户的节点工作站,并不知道中心服务器的存在,感觉和两两直接连接一样。如果采用网状拓扑结构,每个节点之间都要互通才能够交换,不易于维护。交换平台结构如图1所示。
图1 交换平台结构
在星型结构中处于中心位置的是交换中心,交换中心是单例JMS Provider,完成各节点数据的分发处理,实现各节点间的数据交换,对各应用节点是透明的。交换中心的整体行为就像一个虚拟的中心数据库,同时又像一个交换机,负责交换数据同时记录每次交换信息。该结构属于松耦合如同星型网络拓扑结构一样,容易进行层次的结构扩展,构件出多级数据交换中结构,为以后建立地区分中心奠定基础。
2.2 数据交换过程设计
各个交换节点工作站交换接口中有3个存储介质:Outbox、Inbox、Log。Outbox用于存储发送出去的数据,Inbox用于存储接收到的数据,Log作为一个控制器,主要用于记录每次操作的日志。3个存储介质采用了消息队列的方式存储数据,将消息以可靠的方式存储,即使服务器发生故障重新启动后,消息仍然能到达目的服务器也不会丢失。
交换平台交换如图2所示。
图2 交换平台交换
(1)节点1作为发送方,发送接口将业务数据推送到发件库 (OutBox)中。
(2)发送方填写一条待发送日志信息到日志表 (Log)中,其中,最重要的信息是收件地址信息。
(3)发送轮循器扫描日志表里待发送记录,将业务数据和日志信息包装后发送至交换中心,系统记录信息上传时间。
(4)交换中心将消息交付给接收方 (节点2)。为避免接收节点偶尔会出现系统故障无法接收信息,系统设计时,保证只有消息正确发送后,才删除消息缓冲列表中的消息,从而杜绝消息的丢失。
(5)接收方将业务数据存储到中间数据库收件库 (InBox)中同时存储收件日志到日志表中,系统记录信息到达时间。
(6)接收方收件轮循器扫描收件日志。
(7)轮循器根据日志信息从收件库中下载业务数据到相应的业务系统中,系统记录信息下载时间。
2.3 交换平台数据表设计
各节点在数据表[10]EX_Client_ID记录着本节点的组织ID (ORG_ID),组织名称 (ORG_NAME),该节点的交换平台客户端地址,交换时用组织ID作为交换的源、目标地址。在各节点都有两个记录交换数据的日志信息表EX_Data_Detail和 EX _Data_Detail_Profile。EX _Data_Detail表记录要交换的数据基本信息,见表1。
表1 EX_Data_Detail字段
EX_Data_Detail_Profile表记录数据交换过程的详细信息,见表2。其中信息上传时间为业务信息到达交换中心的时间,信息发送时间为交换中心将数据进行发送的时间,信息到达时间是信息到达目标节点数据接口的时间,信息下载时间指信息通过轮循器到达业务系统的时间,信息阅读时间指应用用户阅读该交换信息的时间。追踪Trace字段的追踪需求1-6具体为:1上载、2发送、3下载、4阅读、5回复、6结束。
表2 EX_Data_Detail_Profile字段
2.4 交换的数据包格式
数据交换平台中传输的数据包括日志信息数据、交换的业务数据。日志描述交换的数据是收件记录还是发件记录,是待发送记录还是已发送记录,是待接收记录还是已接收记录,同时还包括数据的源或目标地址等信息。这类数据用一个统一JAVABEAN来描述。实际流转的业务数据是业务系统中的业务表记录。基于业务关系表的记录可以描述成行和列组成单元格的集合,也使用JAVABEAN来描述。数据交换平台将这两个JAVABEAN序列化,MQ Client将它们的组成发送到交换中心进行交换。可序列化对象包装的数据结构如图3所示。
图3 可序列化对象包装的数据结构
3 数据交换平台的主要技术
3.1 面向服务的架构——SOA
该数据交换平台在应用ActiveMQ作为交换中心基础上采用了SOA[11]开发平台进行二次及接口开发,通过将系统的各组件单元进行应用组装,并定义服务间的接口和契约构建系统。通过应用SOA架构,实现了平台系统各组件间的松耦合,提高了系统的可复用性及可扩展性。
3.2 分层结构
在基于SOA架构的前提下,该数据交换平台的系统设计采用了如图4所示的分层结构,整个系统的5个层次如下:页面层、展现层、业务层、运算层、数据层,在5个层次之间引入了 XML (extensible markup language)[12-13]数据总线技术,XML具有很强的数据扩展能力,在各个层次之间传递数据,并扩展各个层次的数据。
图4 分层结构
图4 所显示分层结构与模型—视图—控制MVC(model view control)[14]相互对应。分层结构中的页面层和展现层两个层次构成了MVC中的视图层,负责页面显示,展现层调用了业务层的数据来展示应用功能;业务层和运算层构成了控制层,用于实现具体的业务逻辑;数据层是模型层,与应用系统数据库的数据实体映射,达到数据持久管理的目的,降低应用与数据库结构和数据库类型的耦合度,提高应用系统的数据扩展能力。各个层次的数据都用XML进行传输,形成了XML数据总线。
4 数据交换平台的实现
4.1 数据交换接口
数据交换接口是各个业务系统推送、接收数据的一个入口,各个业务系统交换数据都调用这个接口。发送接口是提供收件地址及业务数据位置后通过该接口将数据发送至交换平台客户端程序。发送接口保存等待发送的数据委托交换中心客户端程序发送数据。发送接口的控制层如图5所示。
图5 交换接口逻辑层
输入的参数包括:目的地址、业务类型、业务数据所在业务表名称、主键名称及主键值、操作类型。返回的结果:1/0。
4.2 轮循器
数据交换中心将数据交换到接收节点的中间库 (Log、Inbox)中,轮循器扫描中间日志表 (Log)中待接收的记录,然后分析并保存业务数据到本节点相应的业务数据库中,同时,轮循器提供收件类型接口以适配不同的收件处理逻辑分支,比如公文收件使用公文收件处理器,表同步使用同步表机构处理器。轮循器功能在SOA开发平台的上完成,使用业务逻辑调用器捕获各模块下载业务逻辑的返回值和异常,并根据需要判断是否反馈下载失败信息。如果下载成功则设置状态位及追踪字段为3(下载),轮循器下次不再主动扫描这个状态的收件记录。控制层主要部分如图6所示。
图6 轮循器逻辑层
4.3 发送和接收功能
4.3.1 配置文件
原ActiveMQ创建连接方式是发送文件 (Send)和接收文件 (Receivemq)中都写入交换中心的IP地址及本节点的唯一标识,本文将连接进行优化,作了一个XML[15]配置文件,将公共信息在这个配置文件中单独配置,然后由JMS方式调用为Send及Receivemq程序所用。该配置文件包含在部署包中,这样新增一个节点也不需要再进行修改程序文件和配置文件。
各节点需要在XML配置文件applicationContext中获取该节点的ClientID和配置交换中心的IP。
获取本节点的唯一标识CLIENT_ID:<property name="queryPrpoertiesSql"> <value>SELECT ORGCODE AS CLIENT_ID FROM EX_Client_ID WHERE RECORDID= ‘1’</value></21》
配置交换中心的地址AMQ_SERVER_IP:
<prop key="AMQ_SERVER_IP">172.31.0.10</prop>
为JMS创建创建连接提供参数,其中端口61616是ActiveMQ默认的配置,默认原值没有修改:
property name="brokerURL">
<value>failover:(tcp://$ {AMQ_SERVER_IP }: 61616? wireFormat.maxInactivityDuration =300000&
wireFormat.maxInactivityDurationInitalDelay=60000)</value> </property>
……
<property name="clientID"value=" $ {CLIENT_ID}"/>
……
4.3.2 发送功能
发送功能主要连接数据交换中心服务器,并将数据交换到数据交换中心。ActiveMQ默认的模式是非持久模式,本文实现的交换平台增加了持久模式下的消息交换。下面是mq包中Send类中主要功能。
JMS客户端到JMS Provider的连接:
Connection connection=connectionFactory.createConnection ();
connection.start();
建立一个发送或接收消息的线程:
ActiveMQSession session = connection.createSession(false,ActiveMQSession.AUTO_ACK);
获取消息的目的地.
Destination destn = session.createQueue ("myqueue");
不持久化的主题模式下:
if(topic){dst=session.createTopic (subject);
MessageProducer:消息生产者:
MessageProducer producer = session.createProducer(destn);
持久订阅模式下:
if(persistent){
producer.setDeliveryMode
(DeliveryMode.PERSISTENT);
……
4.3.3 接收功能
接收功能主要是连接交换中心然后将数据下在到接收节点。下面是mq包中Receivemq类中的数据下载到接收节点并取出消息的主要功能。
建立消息接收者MessageConsumer:
MessageConsumer consumer=session.createConsumer(destn);
接收消息:
DBoracle db= new DBoracle ();
文本消息:
BlobMessage txtMsg= (BlobMessage)message;
大字段消息:
……
得到BLOB对象:
BLOBblob = ((OracleResultSet)rs) .getBLOB("file_blob");
建立输出流:
OutputStream out=blob.getBinaryOutputStream ();
int size=blob.getBufferSize ();
建立缓冲区
byte []buffer=new byte [size];
int len;
取出消息:
while ((len=in.read (buffer))!=-1)
out.write(buffer,0,len);
……
5 应用情况
本文实现的交换平台已经在具有130个节点某个大型分布式信息系统正式应用。每天都有公文、图纸、报表、SQL语句等各种类型数据的交换,数据交换平台能够为应用系统实时交换各种信息,并提供了应用系统的数据交换时间及交换出错后的错误信息。通过应用该数据交换平台节省了发送纸制文件的办公费用,加快各节点之间交换信息的速度,提高了办公效率。
该数据交换平台实施过程简单,交换中心服务器在LINUX操作系统上安装ActiveMQ应用程序包,交换节点只需要部署ActiveMQ应用包。使用了P/S模式,所有交换节点使用一个主题,配置的队列量为零。通过应用该平台体现出了较好的稳定性,主要体现在保证消息的收发不受计算机硬件故障和网络故障的限制,因故障未发送的消息存放在交换平台,待故障解除后准确地发送到接收端。
6 结束语
数据交换是影响分布式信息系统的性能主要因素之一。本文在介绍JMS、ActiveMQ、SOA相关技术基础上,详细说明了数据交换平台的结构设计、交换过程以及交换的数据包格式,,描述了交换平台中数据接口、轮循器以及发送接收等关键技术的实现过程。本文实现的数据交换平台具有低成本、零配置、松耦合、高稳定等特性,并在某个大型分布式信息系统中得到了很好的实际应用。
[1]SUN Yongsong,YE Feiyue.Application of dynamic data replication in distributed information system [J]. Modern Computer,2008,25 (6):147-149 (in Chinese). [孙永松,叶飞跃.动态数据复制在分布式信息系统中的应用 [J].现代计算机 (专业版),2008,25 (6):147-149.]
[2]Thomas ERL.Service-oriented architechture:concepts,technology,and design [M].China Machine Press,2007:55-60(in Chinese). [Thomas ERL.SOA概念/技术与设计 [M].北京:机械工业出版社,2007:55-60.]
[3]JAVA website [EB/OL].http://www.oracle.com/technetwork/java/jms/index.html,2008.
[4]ZHU Fang’e,CAO Baoxiang.Research and application of message queue middleware based on JMS [J].Computer Technology and Development,2008,18 (5):172-175 (in Chinese).[朱方娥,曹宝香.基于JMS的消息队列中间件的研究与实现 [J].计算机技术与发展,2008,18 (5):172-175.]
[5]WU Xu.Research and development of information exchange platform based on MQ [D].Chengdu:Southwest Jiaotong Un1versity Master Degree Thesis,2006:10-16 (in Chinese).[吴旭.基于MQ的信息交换平台开发与研究 [D].成都:西南交通大学,2006:10-16.]
[6]JIA Yi-wei,ZHOU Min.Information sharing platform in electronic government affairs [J].Computer & Digital Engineering,2009,37 (9):155-158 (in Chinese).[贾一苇,周民.实现松耦合电子政务信息共享平台实例研究 [J].计算机与数字工程,2009,37 (9):155-158.]
[7]WU Jun-yong.Information sharing and data communication in electricity power market supporting system based on JMS [J].Computer Engineering and Applications,2007,43 (13):204-206 (in Chinese).[吴俊勇.基于JMS的电力市场支持系统数据通信的实现 [J].计算机工程与应用,2007,43 (13):204-206.]
[8]Apache软件基金组织 ActiveMQ主页 [EB/OL].http://activemq.apache.org/,2008.
[9]IBM技 术 网 站 website [EB/OL].http://www-142.ibm.com/software/products/cn/zh/wmq,2008.
[10]Page,W.G.Oracel 8/8ideveloper’s handbook [M].WANG Lei,transl.Beijing:China Machine Press,2000:699-700(in Chinese).[Page,W.G.Oracel 8/8i开发使用手册 [M].王磊,译.北京:机械工业出版社,2000:699-700.]
[11]LIU Xian-mei,LIU Qian,XU Feng.Research of enterprise application integration model based on SOA [J].Computer Engineering and Design,2009,30 (16):3790-3793 (in Chinese).[刘贤梅,刘茜,徐锋.基于SOA的企业应用集成模型的 研 究 [J]. 计 算 机 工 程 与 设 计,2009,30 (16):3790-3793.]
[12]XML论坛 website [EB/OL].http://www.xml.org.cn/index.html,2008.
[13]CHEN Lin,HUANG Ye.XML-message queue markup language[J].Computer Engineering,2007,33 (19):85-87(in Chinese).[陈林,黄晔.基于XML的消息队列标记语言[J].计算机工程,2007,33 (19):85-87.]
[14]LIU Chun-hua,WANG Zhong-min.Design and implementation of remote evaluation system based on MVC design pattern[J].Computer Engineering and Design,2008,29 (13):3468-3470(in Chinese).[刘春花,王忠民.基于 MVC模式的远程评议系统的设计与实现 [J].计算机工程与设计,2008,29 (13):3468-3470.]
[15]CAI Wen-qing,LI Fan-chang.Design of data exchange model based on JMS and XML [J].Computer Engineering and Design,2007,28 (14):3529-2531 (in Chinese). [蔡文青,李凡长.基于JMS和XML的数据交换模型设计 [J].计算机工程与设计,2007,28 (14):3529-2531.]