APP下载

基于AMQP的即时通讯系统的实现

2017-04-23张乐

电子技术与软件工程 2017年5期
关键词:即时通讯

张乐

摘 要 针对现在移动互联网复杂、信号不稳定的网络特点,设计了一套以 RabbitMQ消息中间件的即时通讯系统,在确保服务质量的情况下,减少了消息的冗余,减少了应用间的耦合关系。使移动设备更加省电省流量,适用于当今移动互联网。

【关键词】AMQP RabbitMQ 即时通讯

1 引言

近几年来,移动互联网凭借其携带方便、接入迅速、业务内容丰富等特点,取得了前所未有的高速发展。目前,移动应用服务呈现多样化,但是移动终端最基本的功能是满足用户的沟通需求,因此即时通讯类应用以其随时随地沟通的特点,满足了用户的需求。

目前常用于即时通讯的两大协议是XMPP协议和SIMPLE协议。XMPP和SIMPLE 都是根据消息体里的消息头寻址的,在消息体里面含有消息的发送者、接收者、消息路由用的会话标示信息等众多头域,使得消息体的体积变大,带宽利用率较低。XMPP 和 SIMPLE协议都是基于字符文本的通信协议,其优点是可读性强,便于抓包分析,但字符文本协议通信效率较低,并且,为了保证通信安全,采用TLS 等加密传输计算量也较大,耗能较高。

移动互联网相对传统互联网最显著的特点就是其移动性,这种移动性随之带来的是诸多不稳定、不可靠和随意性,使得移动互联网上的应用与应用服务器之间建立的网络连接可靠性较差,很难保持长时间的连接状态,因此传统的通信模型,比如 SIMPLE和XMPP 中采用的消息传递,不适合应用于移动互联网,移动互联网应用与应用服务器之间可以通过松散耦合的关系来改善移动性带来的问题。

2 AMQP及RabbitMQ

2.1 AMQP

AMQP(AdvancedMessage Queuing Protocol),即高級消息队列是一个基于消息异步处理的应用层高级消息队列协议,是消息中间件的开发标准。它的主要特征是面向消息、队列、路由,且安全、可靠。AMQP是基于客户端/代理模式,为客户端应用与消息中间件之间提供异步、安全、高效的交互。基于此协议的客户端与消息中间件可传递消息,并不受客户端和中间件不同产品、不同开发语言等条件的限制。

2.2 RabbitMQ

RabbitMQ是流行的开源消息队列系统,由以高性能、健壮以及可伸缩性出名的erlang 语言开发,是 AMQP 的标准实现。它可以支持各种消息交换的体系结构:

(1)存储转发(多个消息发送者,单个消息接收者);

(2)分布式事务(多个消息发送者,多个消息接收者);

(3)发布订阅(多个消息发送者,多个消息接收者);

(4)基于内容的路由(多个消息发送者,多个消息接收者);

(5)文件传输队列(多个消息发送者,多个消息接收者);

(6)点对点连接(单个消息发送者,单个消息接收者)。

RabbitMQ的主要特性包括如下方面:

(1)定义、队列、通道等概念,使对消息投递的控制更加精准灵活;

(2)支持高可用,以及消息和队列的持久化;

(3)支持集群,允许节点动态变化和节点故障;

(4)支持众多语言的客户端,包括Java、C/C++、php、.Netpython等;

(5)高性能、消息吞吐量大。

生产者发送消息到RabbitMQ 服务器,在服务器内部,用户创建交换机和队列,通过绑定规则将两者联系在一起。交换机负责分发消息,根据类型绑定的不同分发策略有区别。消息最后来到队列中,等待消费者取走。

3 即时通讯系统的设计

本文提出一种以RabbitMQ为消息中间件,采用发布/订阅模型作为消息传输机制,以protobuf作为数据传输格式。采用C/S 体系结构,将消息存储与转发,确保消息不重不漏,充分的适应移动互联网的一套即时通讯的方案。

本文的消息系统可以看做是一个发布/订阅系统,有消息生产者publisher,消息中转者RabbitMQ Server,消息处理者Message Server,以及消息消费者Receiver.这个系统中的publisher和receiver共同构成了一个channel。

4 消息流程

4.1 发送流程

如图1所示 。

(1)客户端A发送一条消息内容为send_client_uId【发送者id】 , channel name ,msg time【消息发送时间,精确到秒】,msg content【消息内容】,保存在本地的SqliteSQL数据库,然后用protobuf序列化后发发个RabbitMQ。

(2)Message服务器的消息队列通过RabbitMQ收到来自客户端A的消息,反序列化。

(3)Message服务器收到消息后以channel name+ msg time为key到本地消息缓存中查询消息是否已经存在,如果存在则终止消息流程,通过RabbitMQ服务器发送"duplicate msg"这个msg ack 给客户端A,否则继续。

(4)Message服务器到Counter服务器(消息计数器,为每个text等类型的消息分配msg id)以channel name为key查询其最新的msg id,把msg id自增一后作为这条消息的id。

(5)Message服务器把分配好id的消息插入本地msg cache和msg DB中。

(6)Message服务器给客户端A返回ack, 内容为msg id , msg time , channel name。

(7)客户端A收到ack包后终止消息流程,并删除本地数据库中的数据。如果在发送流程超时后仍未收到消息则转到步骤1进行重试,并计算重试次。

(8)如果重试次数超过两次依然失败则提示“系统繁忙” or “网络环境不佳,请稍后再尝试发送”等,终止消息发送流程。

4.2 接收流程

Message服务器到RabbitMQ中检测Channel中的receive_client是否在线,如果在线则将消息发送出去。

4.3 心跳发送流程

(1)客户端发送心跳包,内容为{client_uId, network type, list{channel name:newest channel msg id} },即心跳包要上报客户端所在的所有channel,以及本地历史消息记录中每个channel最新的消息的id;心跳包转给专门处理心跳逻辑的心跳服务器。

(2)心跳服务器收到心跳包后到Counter服务器循环查询每个channel的最新消息id,如果客戶端上报的id与这个id不等,就发送一条消息通知Message服务器,消息内容为{publish_uId, channel name, client newest msg id of channel【channel内的最新消息Id】}。

(3)Message服务器收到这条消息后,重新启动消息下发逻辑,到缓存中取出所有的大于{client newest msg id of channel}的id列表。

(4)Message服务器依据list中的id到消息存储服务器中依次取出每条消息。

(5)Message服务器把这些消息作为"未读消息"下发给客户端。

(6)心跳服务器给客户端下发heartbeat ack包,数据包括其所在的每个channel的最新消息的msg id。

(7)客户端收到heartbeat ack包后,依据每个channel的最新的msg id与本地消息缓存中对应的channel的最新消息id做对比,如果id不等,客户端可以启动拉取消息流程。

5 结束语

本文简单介绍了基于AMQP协议的标准实现RabbitMQ,并提出了以开源跨平台的RabbitMQ为消息中间件,松耦合的发布/订阅模型为通讯方式,protocolBuffer为数据传输格式的一套完备即时通讯系统设计,详细叙述了消息下发的技术流程,同时保证了消息不重不漏。从根本上解决了由于消息冗余而占用大量的带宽利用率,使得该方案更加适合移动互联网复杂的网络环境。下一步的工作将着重研究在复杂的移动网络下,用于保持连接的心跳包的发送时间间隔的方案和消息传递时的安全问题。

参考文献

[1]王默涵.面向移动互联网的Presence/IM机制的设计与实现[D].小型微型计算机系统,2015(04).

[2]高晓婷.基于AMQP的信息发布与订阅[D].2013(10).

[3]RabbitMQ与AMQP协议详解.http://www.cnblogs.com/frankyou/p/5283539.html

[4]Rabbit MQ[EB/OL].http://www.rabbitmq.com/.z

猜你喜欢

即时通讯
即时通讯在高校体育教学中的应用研究
民事诉讼中即时通讯记录的证据采用进路
即时通讯工具的发展对人际交往的影响分析
即时通讯软件发展模型的实证研究
科学技术哲学视域下的即时通讯
一种基于Java的IM即时通讯软件的设计与实现
用WAP手机上QQ