APP下载

基于商用多媒体云平台的终端推送服务设计

2017-06-27王钰叶徳建

微型电脑应用 2017年6期
关键词:流水指令消息

王钰, 叶徳建

(1. 复旦大学 软件学院, 上海 201203; 2. 网络信息安全审计与监控教育部工程研究中心, 上海 201203)

基于商用多媒体云平台的终端推送服务设计

王钰1, 叶徳建2

(1. 复旦大学 软件学院, 上海 201203; 2. 网络信息安全审计与监控教育部工程研究中心, 上海 201203)

从商用多媒体云平台的业务需求出发,设计了一种终端推送服务。将推送流水的存储、推送消息的发送和推送消息的确认分成3个不同的模块分别完成,实现点对点推送,同时上层服务管理终端分组,保证了推送消息的可靠到达。实验证明,该推送服务的内存占用稳定,推送效率高,吞吐量高,能满足商用多媒体云平台的点对点分组推送功能。

终端推送; 消息可靠到达; 点对点推送

0 引言

互联网时代,消息推送服务[1]在我们身边得到了广泛的应用。推送服务指的是客户端与服务器之间保持长连接心跳,服务器有新消息需要发送时,主动向客户端推送信息,相比客户端的定时请求服务,这种方式减少了交互次数,提高了推送效率。

近年来,在酒店VOD、校园OTT-TV、户外广告等行业中,以视频业务和信息发布业务为主的商用多媒体系统的应用越来越广泛。随着云计算技术的发展,基于云平台构建的商用多媒体系统成为了主流。

终端推送服务是商用多媒体云平台[2]面向终端的重要服务,信息发布系统、OTT系统等均依靠该服务将广告、节目单等信息推送到指定终端设备。在云平台的终端推送[3]方面,运营商往往需要向不同的终端推送不同的内容,例如商场的信息发布系统可在多个屏幕展示不同的产品介绍,推送内容大多是运营商需要发布的重要信息,不允许信息丢失。

文献[1]中采用Netty框架开发,对传统网络服务器采用的阻塞IO开发引起的大量客户情况下推送服务中CPU利用效率低下的问题,但是并没有解决推送服务中信息丢失的问题。文献[4]是多媒体终端推送业务功能的研究,但未能解决商用多媒体云平台推送服务的点对点推送需求。

目前可用的推送技术主要基于MQTT[5]或XMPP协议,二者均使用长连接实现推送。

MQTT协议,是一个轻量级的基于“发布/订阅”模式的消息传输协议,协议开放,扩展性强,提供一对多的消息发布,适用于带宽比较低或者不稳定的网络环境。

XMPP[6]协议,是用于实时通信的协议,通信报文格式基于XML,实时通信技术和推送的底层技术是一样的。该协议可实现点对点推送,但其基于XML的通信报文数据传输量较大,为实时通信而设计的XMPP对于推送冗余功能过多。在商用多媒体系统中,各种云服务之间的业务数据需要互相关联,而XMPP服务器的集群部署模式下只支持单一数据库,无法分库分表,数据相对封闭。在XMPP协议中,各服务器不存储在线消息的发送流水,当在线功能由于异常(如服务器负载过高或突然故障)导致发送失败时,缺乏容错机制,无法保证消息的可靠到达。

综上,MQTT协议需要为每个终端分配唯一的Topic来实现点对点推送,管理不便,XMPP协议可用于点对点推送,但协议内容冗余,传输的数据量大,数据模型相对封闭,消息容易丢失。所以,它们均不适用于商用多媒体云平台的终端推送服务。商用多媒体云平台终端推送服务的需求是:实现点对点推送,终端推送服务本身不管理终端分组,上层服务基于点对点推送可以方便实现分组管理及分组发送;保证推送消息至少成功到达终端一次。基于此需求,我们针对该商用多媒体云平台设计一个定制的推送协议,基于Exchange ActiveSync[7]协议改进。

1 背 景

1.1 可靠消息队列RocketMQ

RocketMQ是一款分布式消息中间件,基于消息队列模式,持久化存储队列,消息顺序可靠,实时消息订阅,具有高可靠、高性能、易扩展的特点。

RocketMQ分为NameServer和Broker两部分,NameServer存储了消息队列中所有的配置信息,Broker则负责消息的存储转发。RocketMQ使用“Topic/Queue”的模型,一个Topic是多个队列的集合,平均部署在多个Broker上,增加Broker数量就可以直接提高吞吐量,易扩展的同时实现负载均衡。每个Broker分为Master节点和Slave节点,Master负责接收、存储和投递消息,Slave则负责进行冗余备份,从而提升可靠性。生产者和Broker之间保持心跳连接,从而在某个Broker故障时,自动选择其他Broker发送消息,消费者从该Broker的Slave节点继续接收消息,提高可用性。RocketMQ采用高并发的方式提高吞吐量,消息消费失败不会阻塞后续消息的投递,性能很高但不保证消息严格有序。

1.2 Java的Netty框架

Netty[8]是基于Java非阻塞IO的网络编程框架,该框架将非阻塞IO与IO复用结合起来,具有较高的网络传输效率,同时还提供了基于事件响应和回调方法的异步编程模型,是目前主流的Java网络编程框架。使用Netty框架可以极大简化选择器监听、连接创建、心跳处理、连接异常处理等各个环节的开发工作。

1.3 Memcache缓存

Memcache[9]是开源的基于一致哈希的分布式键值系统,数据全部存储于内存中,常用作系统缓存,读写速度快,当节点故障时,相邻节点的替代提高了其可用性。

2 设 计

2.1 架构设计

终端推送服务要保证推送消息可靠到达终端,需要在异常情况下对丢失的推送消息进行记录并重试,因此终端推送需要存储所有的推送发送流水,终端在收到推送消息之后要反馈确认,从而确定丢失的推送。但是若在发送推送过程中存储发送流水,则会降低终端推送的吞吐量。本文在终端推送服务的设计中,将推送流水的存储、推送消息的发送和推送消息的确认分成3个不同的模块分别完成,从而将3部分操作解耦。在3个模块中,可以调节部署的服务器数量来维持系统整体吞吐量的平衡。

终端推送服务的整体架构,如图1所示。

整体架构分为3部分:

门户模块(Facade模块):接收来自其他服务的推送请求,通知相应的broker模块发送推送消息,保存和维护终端推送服务的状态(终端与Broker之间的映射关系、终端在线状态等),保存推送消息的发送流水;

图1 终端推送服务的整体架构

推送模块(Broker模块):接收来自Facade模块的请求,将推送消息发送给指定终端;

消息确认模块:接收来自终端的确认消息,更新推送消息发送流水的状态;

3个模块的职责及维护的数据,如表1所示。

表1 推送服务各模块的职责

其中,每台Broker服务器都存储了终端MAC地址和终端Channel连接之间的映射关系,这部分数据只存储在内存中。Facade模块在终端上下线时接收终端发来的RocketMQ消息,之后对终端与Broker模块中各服务器的映射关系进行修改和保存。此处采用异步的RocketMQ消息,其他云服务也可以订阅终端上下线的该消息,例如:云平台的运营服务可以通过订阅各个终端的上下线消息来计算终端的活跃时间。Facade模块还负责接收分布式定时任务的通知,查询发送失败的推送消息流水,通过不断重试进行异步补偿,直至推送消息被确认。

2.2 协议设计

推送协议,如表2所示。

该推送协议设计十分精简,共有4种指令:login指令是终端在发起长连接时,发送给推送服务的第一条指令,携带终端在开机认证过程中生成的token等信息,推送服务可以调用终端的管理认证服务验证合法性,如果认证失败或者连接建立后未发送login指令,则推送服务关闭连接,认证成功则云平台通过推送服务返回认证结果给终端;heart指令用于维护终端和云平台之间的推送服务的长连接,如果任何一方在一定时间内未收到心跳则主动关闭连接,终端开始重连;push指令是在长连接中发送给终端的指令,执行推送操作,该指令包含终端MAC地址、推送消息内容体等;ack指令是终端在收到push指令之后回复给推送服务的确认命令,推送服务在收到ack指令之后记录相应终端的push消息为到达状态。

表2 推送协议

整体协议由长连接和HTTP两部分组成,通过长连接来发送推送消息的内容,通过HTTP来确认推送消息的到达。这种设计极大地降低了长连接管理方面的难度,管理长连接的服务器不需要记录推送消息是否到达,提高了推送效率。对于未收到确认的推送消息,由分布式定时任务异步补偿不断重试直到确认为止。

2.3 终端、Broker模块及Facade模块之间的交互

Broker模块使用Java的Netty框架实现,Netty框架将网络套接字封装成Channel类型,后文将使用Channel表示非阻塞的Socket连接。

终端/二级服务器在上/下线时与Broker模块之间的交互顺序,如图2所示。

图2 终端与Broker模块的交互

终端通过与Broker模块的连接进行登录,完成登录认证的初始化阶段。

终端与Broker模块建立连接之后,先发送携带Token的login指令,Broker模块收到指令后调用终端认证服务检测Token的合法性,并返回认证结果,认证成功则返回Token对应终端的MAC地址。认证通过后,broker模块将终端MAC地址和Channel的映射关系保存在内存中,之后broker模块发送RocketMQ消息通知Facade终端上线,Facade模块记录并保存终端和Broker模块服务器的映射,RocketMQ消息内容为“[online]终端MAC地址+Broker模块服务器IP+时间戳”。Broker模块和终端之间始终保持心跳交互,一方超过指定时间未收到心跳包,broker模块将主动关闭终端对应的Channel,如果Channel在认证成功后被关闭,broker模块会将MAC地址和Channel的映射关系中删除对应的数据,并发送RocketMQ消息通知Facade该终端下线,该消息和上线时发送的内容类似。

Facade与Broker模块之间的交互,如图3所示。

图3 Facade模块和Broker模块的交互

Facade模块同样作为Broker模块的客户端进行登录认证,与终端不同,Facade模块登录时使用特殊的Token——字符串facade,Broker模块检测到该Token之后,对登录者的IP进行验证,Facade模块的IP地址应为内网IP,Broker模块检测出是内网IP则返回登录成功,否则返回登录失败。当Facade模块收到来自其他服务的推送调用请求时,查找终端对应的Broker模块,发送push指令给对应的Broker服务器,Broker模块在终端MAC地址和Channel的映射关系中查找到对应Channel并将push指令发送给终端。Facade模块和Broker模块之间也要维持心跳交互,如果二者之间的连接被关闭,Facade模块将不断重试连接直至成功。

2.4 Broker模块

Broker模块有几个关键工作。在收到客户端发来的报文时,首先判断发过来的指令是否为login指令,根据协议规定,客户端建立连接之后,发送的第一条必须是login指令,因此对于非login指令,处理器将关闭channel。然后,检测login指令的Token是否为字符串facade,从而区分客户端类型为终端设备或Facade模块的服务器。对于来自内部的Facade模块连接请求,通过客户端IP地址是否为内网进行认证,后对Channel做标记。对于来自外部终端设备的连接请求,Broker模块将调用终端认证服务检测该终端合法性,认证通过后,发送RocketMQ消息通知Facade模块终端上线,同时保存该MAC地址和Channel之间的映射关系,并注册Channel关闭事件的回调方法。当Channel被关闭时,将该Channel和MAC地址的映射关系删除并给Facade发送终端下线的RocketMQ消息,最后将Channel标记为外部连接并记录终端的MAC地址。以上的认证操作只在连接新建立的时候执行一次,之后通过这个Channel进行后续操作时,无需认证操作,但是Broker与客户端之间要维持心跳交互。

Broker模块在收到push指令时,检查是否由Facade模块发出和指令的正确性。然后Broker模块根据该指令中的指定的终端MAC地址查找到对应的Channel,将push指令从该Channel转发出去,如果在转发push指令的过程中发生任何异常,例如终端MAC地址对应的Channel不存在,Broker模块将简单地丢弃消息,后续由分布式定时任务进行异步补偿。

2.5 Facade模块

Broker模块的轻量型设计使得我们的推送系统具有较高的性能,而Facade模块需要处理较为重量级的工作:1) 维护终端与Broker模块服务器之间的映射关系;2) 存储推送消息发送流水;3) 执行分布式定时任务,对发送失败的push消息进行异步补偿。

Facade模块的数据库需要保存终端与Broker模块服务器之间的映射,同时存储推送消息的发送流水,数据库模型,如图4所示。

图4 Facade模块的数据库模型

其中MAC_BROKER表记录了终端MAC地址与Broker模块服务器之间的映射关系,终端MAC地址具有唯一性约束。Facade模块在发送push消息时,为了查找终端对应的Broker模块服务器,需要在MAC_BROKER表中进行大量的查询操作。为了提高查询效率,在Memcache集群中为该表建立Key-Value查询缓存,Key为终端MAC地址。每次查询先从缓存查询,查询不到再从数据库查询,并将结果写入缓存;每次更新数据库时对缓存进行更新,保持数据一致性。

MAC_MSG表记录了Facade模块对push消息的发送流水,终端MAC地址和Push消息ID联合组成唯一性约束,表中STATUS字段表示push流水状态:INIT表示未收到回复,ACK表示已收到回复。MSG表则为push消息的历史记录表。一定时间后通过定时任务将这段时间的MAC_MSG数据表迁移到历史记录表中,同时检查push流水状态,记录并报告未收到回复的流水,但是将放弃重新推送该push消息。

外围系统在调用终端推送服务时,需要在Facade模块注册push消息并获得ID,然后向Facade模块发送RocketMQ消息,内容包括push消息ID和终端MAC地址列表,在列表数量过多的情况下将拆分成多个RocketMQ消息发送给推送服务。

Facade模块的推送服务流程,如图5所示。

图5 Facade模块的流程图

Facade模块收到推送请求后,将push消息发送流水插入到数据库MAC_MSG表中,同时由另一个消费者将发送流水向业务线程池提交push任务,由业务线程池异步执行push消息推送。RocketMQ消息中如果有push消息流水未插入成功,则返回失败,RocketMQ将重新投递失败的消息,MAC_MSG表的唯一索引可以防止重复插入。业务线程池在MAC_BROKER表中查找当前push流水需要的Broker服务器IP,并通过Broker模块的客户端将push消息发送出去。

当Facade模块收到终端上下线的RocketMQ消息时,要更新MAC_BROKER表和Memcache缓存中的MAC地址和服务器IP的映射关系。终端上线时,将表中Broker字段更新为真实IP,下线时更新为OFFLINE。并同步更新Memchache缓存,比较RocketMQ消息中的时间戳和数据库记录的时间戳大小,防止更新过期的RocketMQ消息。终端上线时,Facade模块需要查询该终端是否有未确认的push消息流水记录,有则需要发送对应push消息到业务线程池。

由于Facade模块的两个消费者异步处理RocketMQ消息,一个负责落库发送流水,一个负责封装并提交push任务,若出现一条流水在落库之前已经完成了推送操作,则直接在数据库中插入该流水,并将该流水状态记录为ack。同一条流水多次发送的情况由客户端自行处理。

对于终端未成功收到push消息的情况,通过分布式定时任务进行异步补偿,执行频率为每分钟一次。Facade模块将查询MAC_MSG表中所有需要重发的push消息流水记录。当流水状态为INIT,终端处于在线状态,流水的更新时间超过一分钟这三个条件同时满足时,从MSG表中将流水的push内容体封装,并和push流水组装成pushTask提交到Facade模块的业务线程池,从而将push消息发送到终端。

3 应用与测试

商用多媒体平台终端推送服务为了达到点对点推送、分组管理发送、准确到达等需求,定制设计成推送流水的存储、推送消息的发送和推送消息的确认三个模块,以下是该终端推送服务的相关测试。

3.1 Broker模块的内存占用和推送效率测试

由于Broker模块所在的服务器会与大量的终端建立长连接,因此对Broker模块内存占用和连接数之间的关系进行了测试。在Clear商用多媒体云平台环境下,用单台虚拟机搭建了Broker模块,用软件模拟支持自定义推送协议的终端,记录Broker服务器的内存随着终端增加的变化。同时使用了相同的虚拟机搭建了常用的基于XMPP协议的开源Openfire[10]服务器,并用软件模拟终端XMPP客户端与服务器保持长连接,进行同样的测试来对比。

测试结果发现,随着终端数的增加,Broker模块内存增长并不明显,每个终端的连接平均消耗11.83KB内存;随着终端数的增加,Openfire的内存占用明显增高,平均每个连接占用146.2KB,是Broker的12倍。这是因为Openfire支持的XMPP协议较复杂,保存了很多无关数据,自带Web管理界面,因此对内存消耗较大,而Broker模块只维护连接的开销及连接与终端的映射,得益于终端推送协议的精简设计。

然后对Broker模块的推送效率进行了测试,同样以Openfire作为对比,共9000个终端,每个终端发送12条push消息,共计108000条push消息。对于Broker模块,直接向其发送push指令,不使用Facade模块是为了避免流水落库对实验结果的影响。对Openfire通过XMPP标准客户端类库Smack进行模拟,在推送过程中均没有持久化存储push消息的发送流水。如表3所示。

表3 Broker模块推送效率测试结果

Broker模块的平均吞吐量是Openfire的2.1倍,同时Broker模块发送的push消息通过异步补偿的方式确保无丢失,而Openfire的push消息到达率为93.77%,Openfire没有相应的补偿机制。

3.2 Facade模块的吞吐量测试

Facade模块负责接收推送请求,将push消息的发送流水落库,并发送push给Broker模块。Facade的吞吐量测试,如图6所示。

图6 Facade吞吐量测试

测试过程中共模拟3000个终端,向每个终端推送10条消息,实验有两次,第一次由单台(Facade模块的)虚拟机对外发送,第二次是由两台虚拟机组成的集群共同发送。

图6中发现,单台服务器的吞吐量为每秒872.1条,两台的综合吞吐量为每秒1685.4条。在push消息的发送过程汇总,Facade模块的吞吐量基本比较平滑,这是由于Facade模块用了线程池异步提交的方式,对外暴露RocketMQ的消息接口,在推送请求的接收、push流水的落库、发送push给Broker模块这3个环节中全部异步化操作,可以缓解流量高峰,起到缓冲作用。

Facade模块和Broker模块在单台服务器的吞吐量方面有较大的差距,这是由于Broker模块精简化,但Facade模块需要与数据库交互。在生产环境中,两个模块分开各自部署在集群中,需增加Facade模块的集群数量以提高系统整体的吞吐量,实验过程中发现Facade集群数与Broker集群数比值为5∶1较为合适。

4 总结

本文在设计终端推送服务的时候,结合商用多媒体云平台的行业需求特征,将存储发送流水、推送push消息和push消息确认3个环节异步解耦以提高吞吐量,主要解决了推送消息可靠到达的问题。

后续工作可以将推送服务进一步完善,提高吞吐量,在实验中改进推送效率和资源消耗问题,尝试将推送服务对其他云服务开放,提供更多分析数据,加强平台的开放性建设。

[1] 代超,邓中亮.基于Netty的面向移动终端的推送服务设计[J].软件,2015,12:002.

[2] 吉亚云, 刘新, 叶德建. 商用多媒体信息发布系统持久层设计与优化[J]. 计算机工程, 2015, 41(1): 261-265.

[3] Gudla S K, Bose J, Sunkara S, et al. A unified push notifications service for mobile devices[C]//Electronics, Computing and Communication Technologies (CONECCT), 2015 IEEE International Conference. IEEE, 2015: 1-6.

[4] 苏毅. 移动多媒体推送业务终端功能研究[D]. 北京:北京邮电大学, 2009.

[5] Thangavel D, Ma X, Valera A, et al. Performance evaluation of MQTT and CoAP via a common middleware[C]//Intelligent Sensors, Sensor Networks and Information Processing (ISSNIP), 2014 IEEE Ninth International Conference. IEEE, 2014: 1-6.

[6] 汪海占, 邸萌, 黄祥林. 基于 XMPP 协议的 Android 消息推送设计与实现[J]. 科技广场, 2015 (2): 40-46.

[7] Sinha A, Paul N, Devarajan S. Cloud based mobile device management systems and methods: U.S. Patent 9,060,239[P]. 2015-6-16.

[8] Maurer N, Wolfthal M. Netty in Action[M]. Manning Publications, 2016.

[9] Foong A, Hady F. Storage As Fast As Rest of the System[C]//Memory Workshop (IMW), 2016 IEEE 8th International. IEEE, 2016: 1-4.

[10] Sun M, Wang S, Fang Z, et al. Design of an Instant Messaging System Based on the IaaS Cloud Platform[J]. Journal of Communications, 2015, 10(9).

Design of Terminal Push Service Based on Commercial Multimedia Cloud

Wang Yu1, Ye Dejian2

(1. Software School, Fudan University, Shanghai 201203, China;2. Engineering Research Center of Cyber Security Auditing and Monitoring, Ministry of Education, Shanghai 201203, China)

In order to meet the business needs of commercial multimedia cloud platform, we design a new terminal push service which divides the push service into three parts, i.e. the storage of push record, the delivery of push message, and the confirm of the delivery. This terminal push service realizes point-to-point delivery, the management of terminal grouping of the upper service ensures the reliable arrival of the push message. According to the experiments, this terminal push service satisfies the point-to-point grouping push demands with high push efficiency, high throughput, and stable memory usage.

Terminal push service; Reliable arrival of message; Point-to-point delivery

王钰(1993-),女,江西,硕士研究生,研究方向:网络多媒体。 叶德建(1976-),男,浙江,副教授,研究方向:网络多媒体。

1007-757X(2017)06-0045-05

TP311

A

2017.04.07)

猜你喜欢

流水指令消息
流水
一张图看5G消息
流水有心
前身寄予流水,几世修到莲花?
中断与跳转操作对指令串的影响
基于汇编指令分布的恶意代码检测算法研究
消息
消息
消息
一种基于滑窗的余度指令判别算法