基于通用消息的持久化消息队列设计
2010-07-17郭盛兴廖建新
郭盛兴, 王 晶, 廖建新
(1. 北京邮电大学 网络与交换技术国家重点实验室, 北京 100876;2. 东信北邮信息技术有限公司, 北京 100191)
中间件是一种定义于操作系统之上,应用程序之下的一层软件,它能使应用之间进行跨网络协同工作,屏蔽了操作系统和网络协议的差异向应用提供通信服务[1]. 消息中间件支持在一个分布式应用环境中多种用途的消息交换. 它所提供的API将不同分布式环境很好地封装起来,对外提供统一的接口,使得应用能通过统一接口进行开发[2].
通用消息(component packet of realtime application process management and communication,COPART-MACO)是一个抽象得比较好的消息中间件. 对不同类型的上层消息增加同样的底层消息头,如图1,这种消息称为通用消息.
使用通用消息的进程间交互采用统一的协议,底层采用统一的方式通信,软件功能差异主要体现在高层的消息处理部分[3]. 通用消息可以满足不同进程间通信需要,但通用消息是面向无连接的,只提供了消息寻址功能,不保证通信的可靠性,对比TCP/IP协议来说,相当于实现了IP层功能. 为满足可靠性要求,提出了基于通用消息的持久化队列(Ebupt Message Queue简称EMQ)设计,EMQ由服务端和客户端组成,服务端提供集中式的消息接收、存储和转发服务,客户端可以通过服务端发送和接收消息,客户端和服务端之间的通信采用请求应答方式,服务端具有消息持久化功能.
图1 通用消息的消息结构Fig.1 COPART-MACO packet encapsulation
1 EMQ架构设计
1.1 EMQ的部署结构和组件
EMQ部署结构如图2,其中包含3个域:消息队列服务域、生产者域、消费者域.
图2 EMQ部署结构Fig.2 EMQ deployment structure
包含的组件说明:
1) ininit,消息队列的守护进程,负责启动本域内的所有其他进程,在子进程异常退出后,重新启动子进程;
2) inaccessd,控制台接入服务端进程,接入它之后,可以建立与本域内的所有进程的连接,输入控制台命令得到应答;
3) msgr,通用消息的消息分发模块,用于转发本域内的进程和外部进程之间的交互消息,根据实际需要,也可以有一个或者多个;
4) emqserver,EMQ服务端,持久化消息队列的核心,负责接收、存储和转发消息;
5) emqclient,与EMQ服务端交互的客户端,EMQ客户端分为生产者和消费者,生产者发送消息给消息队列服务端,消费者从通用消息服务端接收消息并处理.
1.2 EMQ服务端内部结构
EMQ服务端的结构如图3:
图3 持久化消息队列服务端结构Fig.3 EMQ server structure
服务端底层通信链路层是通用消息层,EmqManager管理EMQ服务端的两个最主要的数据结构,与客户端的连接EmqConnection和持久化的消息队列EmqQueue,连接与消息队列间通过队列名QueueName相互关连.
1.3 EMQ虚拟连接
通用消息是面向无连接的,所以EMQ客户端与服务端之间建立的是虚拟连接. 在通用消息的结构下,每个进程都有唯一的进程地址标识,包括三部分:域编号,功能实体编号,和进程实例编号. 一条EMQ虚拟连接对应一对进程地址标识. 连接建立的时候,客户端请求参数中含有队列名,服务端保存与客户端的连接、进程地址标识与队列的对应关系,客户端收到服务端连接响应后保存与服务端的连接信息. 对于JMS[4]中一个客户端进程与服务端一个队列建立多条连接的情况,如Java编程中客户端进程采用多线程,这时每个线程会与服务端的一个队列有一个虚拟连接,将每一个线程作为通用消息的一个实例.
1.4 EMQ持久化队列
考虑通用性,持久化队列设计成相对独立的模块,按照EMQ消息持久化所要求的接口能力提供操作接口. EMQ需要持久化队列实现提供的接口主要包括:createQueue(创建队列)、destroyQueue(删除队列)、enQueue(消息入队)、deQueue(消息出队)、recover(恢复队列)等. 研究给出两种持久化队列可行的实现方式:
1) 基于文件和索引的实现
定义每条消息的持久化存储结构,将通用消息整个消息的长度、消息头和消息体及消息是否已经被“消费”的标识存储到文件,在内存中保存每条消息在文件中的位置索引,读取消息时直接通过索引根据存储结构读取消息. 此方式在消息量相对比较小的情况下是一种比较好的选择.
2) 基于开源项目的实现
基于SQLite来实现持久化队列的存储,在SQLite基础上做一层封装,提供EMQ所需的接口. 基于SQLite实现的持久化队列,对于比较大的数据量也能获得比较好的性能. 除SQLite外,还可以考虑Berkeley DB、redis等其他开源项目,在其开放的API基础上封装为EMQ提供所需的接口即可.
2 消息交互流程设计
生产者与消费者间核心的消息交互流程如图4.
1) 生产者依据负荷分担策略从几个服务端连接中选择一个连接,发送消息请求;
2) 服务端收到消息后,找到消费者进程地址标识对应的持久化队列,并将消息持久化保存到队列中;
3) 服务端保存消息后将操作结果响应给生产者;
4) 生产者依据响应结果决定消息如何处理;
5) 当消费者连接对应的队列中有数据时,服务端将每次取出一条消息发送给消费者;
6) 消费者收到消息后首先给服务端消费消息响应;
图4 消息交互流程Fig.4 Diagram of message flow
7) 消费者对收到的消息进行“消费”处理.
3 接口设计
3.1 接口概述
EMQ客户端和服务端的通信多数采用请求—应答方式. 定义EMQ客户端和EMQ服务端的消息格式如图5.
图5 消息格式Fig.5 Message format
1) 操作类型:用于区分对队列的不同操作,为一字节整型;
2) 操作参数:可以为任意长度,但是由于EMQ消息使用通用消息的消息体承载,所以操作参数最大长度受通用消息一个包的最大长度限制.
3.2 操作类型
EMQ客户端与服务端之间接口消息的操作类型编码和操作参数定义如表1.
3.3 操作结果
操作结果0表示成功,非0表示失败,表2是各种主要操作结果代码及说明.
4 流量控制与服务质量
流量控制机制理想情况下可以指示任意时刻发送方发送消息的速率或数量,确保接受方的资源不被耗尽[5].
表1 操作类型定义Tab.1 Define of message operations
表2 操作结果代码定义Tab.2 Result code of operation
服务端采用基于滑动窗口的流量控制机制[6],针对每条消费者连接设置一个发送窗口,当发送出去但还没有收到响应的消息数量达到窗口大小或收到操作结果代码为“超过接收窗口大小”的应答时,服务端延迟发送消息. 对于每条生产者连接,服务端采用接收窗口来进行流量控制,当收到一条消息,如果未发送应答的消息数量达到窗口大小时,应答“超过接收窗口大小”的操作结果代码,发送方则延迟发送消息.
消费者采用服务端同样的流量控制机制和错误超时重传策略.
服务端与客户端采用请求-应答机制,当应答错误和应答超时通过消息重传来保证消息不被丢失,而且EMQ队列服务端具有持久化功能,因此可以有效保证服务端与客户端之间消息通信的服务质量.
5 结束语
为发挥通用消息的作为底层通信模块的优势,同时弥补其在服务质量方面的不足,在通用消息基础上,提出了一种消息队列EMQ的设计. EMQ服务端与客户端之间基于请求-应答方式,当请求应答错误和应答超时进行消息重传,服务端对消息持久化,这些机制可以更加有效地保证消息队列的服务质量.
在实际应用中,设计提高了通用消息的服务质量,但由于消息持久化的引入却付出了性能的代价,面对当前各种应用中越来越大量的交互消息及实时性要求,性能又是非常重要的,所以研究更高效率持久化队列,设计一种无论对小数据量还是大数据量消息交互均有良好的性能表现的队列,是后续研究重要的努力方向.