APP下载

基于响应式的服务推送框架设计

2021-08-23胡喜明

计算机工程与设计 2021年8期
关键词:消息客户端框架

胡喜明,胡 淼

(杭州电子科技大学 通信工程学院,浙江 杭州 310018)

0 引 言

随着物联网技术的发展,人们能够通过移动终端方便获取所需要的信息与服务[1]。为了保证网络设备与物理设备间可靠的传输,用户将传统的消息拉取模式转变为应用消息推送。与消息拉取模式相比,由服务器将所产生的消息推送用户,符合当前互联网流量管理以及资源分配的需求,同时在传输的实时性、高效性以及流量资源节约方面都有较大的提升[2]。物联网设备需要考虑到硬件相关参数,例如CPU性能、最大带宽容量、电池使用时间、网络最大实时流量等方面的问题,在服务推送方面需要一种功能完善、更加轻量、带宽占比小且性能高的服务推送技术[3]。

当前主流的服务推送方案有GCM服务、MQTT以及HTTP消息轮循[4]。其中HTTP轮循方式最为简单,但是在实用性、可靠性以及可扩展性方面有所欠缺。GCM服务是google研发的一款云推送框架,拥有完善的推送方案,但是受限于网络原因,无法在国内使用。MQTT协议具有低延迟、低带宽、推送速度快等优势,适合大部分物联网开发场景,然而单纯的MQTT协议本身过于简单,在消息推送、安全性管理、主题化推送方式、数据缓存性等方面开发难度大,因此需要对MQTT协议进行二次开发[5]。

本文提出一种基于Reactor-Netty+MQTT的高性能服务推送框架。项目中应用Reactor-Netty技术替代传统Netty技术对MQTT协议进行封装,借助该技术的响应式非阻塞编程、事件驱动等特性构建高性能通信服务。通过Redis进行数据缓存和Kafka实现消息代理转发进一步提高系统服务处理能力以及线程安全。

1 相关技术

1.1 MQTT协议

1.1.1 MQTT背景及特点

MQTT是由IBM公司在1999年所提出的一款基于TCP协议的发布订阅协议[6,7]。MQTT是为了在有限的带宽以及内存条件下实现消息可靠传输。该协议提供发布/订阅形式,实现单点以及一对多的发布消息推送、应用TCP协议实现有序可靠双端连接、报头以及心跳报文仅占用3个字节,将带宽传输最小化,有效降低网络通信流量、支持“最多一次”、”至少一次”,“仅一次“3种服务质量等级、支持遗嘱机制,实现客户端异常断开后,自动根据所设置的遗嘱机制,发布相关主题信息通知其它订阅的客户端用户。基于以上特点使得当前物联网的开发中绝大多数都将MQTT作为消息传递协议的首选[8]。

MQTT协议主要拥有3个角色:消息发布者、消息代理、消息订阅者[9]。消息通过推送的形式从发布者经过消息代理进行发布,此时消息已经确定了自身所对应的主题,消息订阅者可以根据主题进行相关数据的订阅[10,11]。MQTT工作模型如图1所示。

图1 MQTT工作模型

1.1.2 MQTT协议消息传输格式

MQTT协议主要由固定报头、可变报头、有效载荷3部分构成。固定报头是所有MQTT报文段中必须存在的,总长度为2字节。在第一个字节中,7-4位标识MQTT控制报文类型,目前报文类型主要分为连接、订阅、发布3种。服务质量占两个标识位,分为Qos0、Qos1、Oos23种。4-0位最为控制报文标志位,可看作一种属性参数。可变报头与有效载荷根据协议的不同进行省略[12,13]。

1.2 Reactor-Netty框架

为了应对高并发场景下流量过大情况,微软设计了一种异步的编程思想-响应式编程(reactive programming)。响应式编程是一种专注于数据流以及变化传递的异步编程模式,这也意味着可以应用编程语言进行静态和动态数据流的表示。在jdk9中java引入了Reactor的概念,Reactor-Netty作为响应式编程家族的一员,其底层基于Netty框架,对Netty进行响应式编程封装,将其转换为异步事件驱动的网络应用程序框架。Reactor-Netty内部仍然保留了Netty的主从多线程模型,拥有Netty框架的全部优势。

Reactor通过Reactor Streams中的背压,进行数据流量控制,发布者和订阅者可以进行数据流量协商,其中背压分为4种回压策略:①onBackpressureBuffer:对下游的请求数据采用缓存的形式,保证系统不会压力过大。②OnBackpressureDrop:元素就绪时,根据下游是否有未满足的request来判断是否发出当前元素。③OnBackpressureLatest:一直发送当前最新的数据。④OnBackpressureError:当前数据已经满了,再次添加请求直接报错。图2为OnBackpressureBuffer模式的背压原理图,当订阅者的消费能力远小于发布者,订阅者可以通知发布者进行服务的取消和终止功能,保证传输数据流量合理。

图2 OnBackpressureBuffer模式背压原理

2 服务推送框架设计

整体服务器分为3部分,消息推送broker模块,针对MQTT协议所定义的多种消息类型进行分类处理以及通讯传输协议的搭建。服务认证模块,为保证服务的安全性提供了接口化验证,通过将RSA算法与Redis缓存结合,实现安全性加密认证。数据缓存模块,为防止传输中服务突然宕机所导致的数据丢失,采用Redis对服务中传输的数据进行缓存。消息分发模块,针对服务中耗时操作以及大型互联网数据进行流量统计的需求,对接企业级消息队列kafka实现消息服务端与数据分发接收端的解耦,便于对海量数据的处理。系统架构如图3所示。

图3 服务推送系统架构

3 服务推送框架模块实现

3.1 服务认证模块

服务认证是对系统进行安全维护的手段,客户端在接入服务器时需要进行权限认证工作,通过对接入用户的身份验证,保证服务传输信息不被恶意获取。验证流程如图4所示。

图4 服务认证流程

为保证服务的安全性,需要对密码进行加密处理,防止由于密码泄露所导致的非法客户端接入窃取信息。这里采用RSA非对称加密算法实现公私钥加密校验。应用项目中提供的RSAUTIL类生成公钥与私钥,其内部经过大量的逻辑运算实现加密。系统需要保证每次生成的公钥与私钥唯一,这里将服务名称与当前时间进行拼接作为加密的盐值。这样每次获取生成的公钥私钥必然唯一。公钥私钥生成后以服务名称作为key值,明文密码作为value存储到Redis中。

客户端在发起连接时,需要将通过公钥加密后的密码和用户名称一并传输,服务器接收到信息后,通过私钥对传输密码进行解密与Redis中存储的密码进行对比验证。

3.2 MQTT核心Broker模块

Broker模块是推送服务的核心功能模块,关于MQTT协议的逻辑处理部分均在该模块进行。其中主要包括QoS服务质量等级选择功能、遗嘱消息功能,保留消息功能、客户端自动重连功能、心跳机制功能、MQTT/WS连接功能以及主题过滤功能。

QoS服务质量等级选择功能主要是为根据订阅端业务为其提供消息选择类型的处理,内部根据选择等级的不同实现相应逻辑处理。其中主要分为Qos1、Qos2以及Qos3这3种。

(1)Qos0:在该服务质量下,消息至多发送一次,其消息的发送完全依赖于OSI七层协议中的传输层进行维护,可能会发生消息丢失或者重复的情况。该服务质量可用于如下情况:定时推送周围环境相关数据,丢失一两组数据不会影响服务的应用且在不久后会再次推送。该情况主要用于普通APP定时推送功能,若设备当前推送数据时设备并未接入网络,此时即使再次联网也会丢失数据。

(2)Qos1:在该服务质量下,消息至少发送一次,消息发送到客户端后,客户端返回确认信息标志当前服务已送达。该服务由于有一次确认机制,因此在网络环境比较好的情况下可以实现数据正确流畅的传输,但是若网络环境出现波动使得服务端无法收到确认信息,可能会造成数据多次重发的问题。

(3)Qos2:该服务质量在三者中处于最高级别,该消息仅会发送一次。主要用在当消息丢失或者重发时,对服务端造成业务上的影响的情况。为了保证传输一次的要求,采用了两阶段确认的方式,与Qos1相比开销较大。

遗嘱消息功能主要是在发生服务断连问题的时候,订阅端无法及时了解消息发布方状态导致的持续等待问题。遗嘱消息保证了消息发布方在网络波动所导致的服务下线时能够及时通知订阅方,这里遗嘱的内容以事件监听器的形式实现,一旦出现服务断连的情况即可通过监听模式进行响应;

保留消息功能主要是对重要信息进行标记,任何标记为保留消息的内容,新接入的订阅方都可以在连接后收到,并且不需要等待消息发送方的推送,内部将消息存储在Redis缓存中,保证数据不丢失;

订阅方断线重连功能主要是为了将服务自动化管理,保证订阅方在由于网络波动导致失去连接,服务器会自动实现断线重连,降低维护成本,由Reactor-Netty内部采用的心跳检测机制,通过设定心跳间隔实现断线检测功能,并且基于事件驱动,在产生了心跳断连的情况时触发重连机制,通过轮循的方式发送重连请求保证服务的可靠性;

心跳检测功能主要是通过底层Netty的心跳检测模块,MQTT协议中有PINGREQ心跳请求,通过实现底层Rea-ctor-Netty的Handler类,对传入的PINGREQ进行管理,客户端定时发送心跳检测请求,通过Reactor-Netty内置的MONO类进行调用,该类基于事件触发,可根据实际情况进行结束处理。并且MONO类可以与java8中并发异步响应类进行转化,服务端将心跳响应包装为MONO进行返回操作;

MQTT/WS连接功能在推送服务中,不仅要与采用MQTT协议的硬件设备进行关联,同时还会与互联网设备产生连接,因此在服务器接收连接请求时会判断当前订阅端类别,根据MQTT以及WS进行分类处理。底层采用工厂设计模式对服务进行划分,根据传入的标识进行判断并选取不同的处理方式。对于MQTT协议,首先需要传入配置类,通过内置buildServer方法将配置类中信息绑定到当前服务中,为保证安全服务端可以设定SSL验证保证传输安全。在配置类中通过MONO类的链式调用,将协议的处理类,以及客户端重传机制进行绑定。WS连接与MQTT不同之处在于底层采用http传输,同时WS也不需要重连机制。服务器内部封装通过Reactor-Netty底层中与网络传输相关的Handler处理类进行半包解析、长连接以及序列化;

主题过滤功能主要是对通配符进行匹配,其中”#“符号表示只要该符号前的信息匹配,后续字段可以忽略不计。”?“表示当前的占位符,可替代任意符号。服务器通过对通配符进行匹配,实现主题个性化定制,该功能主要针对大批量服务接入时需要根据主题进行客户端分类。

borker模块中Reactor-Netty对MQTT协议进行封装,结合Reactor框架响应式以及背压的特点,提升MQTT整体性能。

以Reactor-Netty框架初始化服务功能为例:

(1)服务启动后首先调用服务端工厂方法TransportServerFactory进行服务初始化操作,MONO类进行服务的绑定以及初始化。

//初始化类,绑定相关属性信息

Mono.from(protocolFactory.getProtocol(

ProtocolType.valueOf(config.getProtocol()))

//获取传输信息

.get()>.getTransport()

//绑定Reactor内部类

.start(config,>unicastProcessor)

//netty初始化类

.map(this::>wrapper)

//错误信息打印

.doOnError(config.getThrowableConsumer());}

(2)调用底层Reactor-Netty方法进行初始化。创建Netty连接并设置定时属性。

NettyInbound inbound = connection.getInbound();

Connection c = connection.getConnection();

// 定时关闭

Disposable disposable = Mono.fromRunnable(c::dispose)

.delaySubscription(Duration.ofSeconds(10))

.subscribe();

// 设置connection

c.channel()

.attr(AttributeKeys.connectionAttributeKey)

.set(connection);

// 设置定时关闭

c.channel()

.attr(AttributeKeys.closeConnection)

.set(disposable);

(3)设置心跳检测以及遗嘱消息处理模块

//心跳检测

connection.getConnection()>.onReadIdle(config.getHeart(), () -> connection.getConnection()>.dispose())

//设置遗嘱消息

connection.getConnection()>.onDispose() ->

{

Optional.ofNullable(connection.getConnection()>.channel()>.attr(AttributeKeys.WILL_MESSAGE))>.map(Attribute::>get)

(4)根据QOS等级设置不同的消息处理类。

switch (qoS) {

//QOS1等级

case AT_LEAST_ONCE:

co.sendMessage(false, >qoS,>willMessage.isRetain(), willMessage.getTopicName(), willMessage.getCopyByteBuf())>.subscribe();

break;

//QOS2等级以及QOS3等级

case EXACTLY_ONCE:

case AT_MOST_ONCE:

co.sendMessageRetry(false,>qoS,>willMessage.>isRetain(), >willMessage.getTopicName(),>willMessage.getCopyByteBuf())>.

subscribe();

break;

//未传输时默认传输等级

default:

co.sendMessage(false,>qoS,>willMessage.is Retain(), willMessage.getTopicName(),>willMessage.getCopyByteBuf())>.subscribe();

break;}

})));

(5)sendMessage内部绑定了handler处理类,通过对传输类型的判断设置不同的handler。

messageTypeCollection.computeIfAbsent(messageType,

type->{

//根据传入的类型进行判断所需要的handler

switch(type){

//数据传输响应

case PUBACK:

case PUBREC:

case PUBREL:

case PUBLISH:

case PUBCOMP:

return new PubHandler();

//连接请求

case CONNECT:

case DISCONNECT:

return new ConnectHandler();

//心跳检测

case PINGREQ:

return new HeartHandler();

//服务订阅

case SUBSCRIBE:

case UNSUBSCRIBE:

return new SubHandler();

}

经过以上操作,服务器初始化成功,等待相关请求。服务器在接收到请求后,根据数据请求类型进行相应处理。

3.3 服务器数据缓存模块

随着服务连接数的增多,交互数据量也逐渐增大,如果将数据都存储在内存中,将降低整体服务的性能。项目中将数据存储在Redis中,作为当前服务数据的缓存。Redis内部对存入信息按照key-value形式进行存储,保证数据的唯一性。Redis是一款Nosql数据库,在存储方面采用的单线程处理,因此访问时不需要进行并发维护,并且Redis在并发数据访问以及读取方面的性能均好于数据库。为防止Redis系统崩溃导致的数据丢失,内部开启数据持久化形式,将数据定时保存到磁盘文件中,同时采用sentinel监控下的Redis集群模型,sentinel作为服务监控组件主要功能为服务监控、故障提示、故障自动转移,通过对当前Redis集群的心跳监控,保证当Redis主节点崩溃时,系统自动在其它从服务器中进行选举,产生新的主节点,实现服务的可用性。Redis集群模式采用哈希槽算法对每个存入的key进行CRC16校验后对16 384进行取模来决定放入那个服务器中。相对于传统集群形式,该结构更容易扩展,如果扩充一个节点D,只需要将A、B、C节点中的部分槽放置在D上;如果想移除节点A,只需要将A的数据转移到B和C节点上。由于将哈希槽从一个节点移动到另一个节点不需要停止服务,只需要通过命令直接再分配,因而上述扩展不会造成集群不可用,保证了服务的高可用性。

如表1所示,项目中Redis主要对以下7个内容进行存储。

表1 Redis存储信息

publish以及pubrel:主要是为了满足Qos质量等级,质量等级的不同导致数据需要重传,因此需要对传输当前数据进行缓,等待重新发送。

session:主要是对当前客户端与服务器所建立的会话进行存储。MQTT的设计主要是为了对信号不稳定的网络提供服务,因此有时会出现网络波动导致的断开连接情况,此时客户端就可以在Session中将遗嘱信息也进行存储,服务器会定期对客户端状态进行检查,如果出现异常可以向Topic中发送遗嘱信息通知订阅方,避免服务宕机而导致的订阅方长时间等待问题。

client:订阅者根据主题的不同实现订阅服务,Redis中需要针对每个订阅者进行唯一的标识ClientId,该标识应用Redis内部的incr方法实现标识号的递增操作并与服务名进行拼接。在高并发场景下,Redis内部实现id的同步生成,保证线程安全。将ClientId与所订阅的主题进行绑定,方便后续服务推送时查询。

subWild与NotsubWild:发布方采用个性化的主题模式进行服务发布,其中可以通过#与?进行多字段匹配,与当前单独字段匹配,因此在存储的时候需要将有无通配符的主题进行分别存储。

retain:该缓存主要为了broker模块中保留信息功能,可以让新订阅的客户端获取发布方当前最新的信息,不需要进行接收等待。

3.4 消息代理分发模块

消息代理分发模式是在硬件设备之间通信的基础上,将数据传入大型互联网设备中进行处理。这里采用kafka作为消息代理分发的中间件,kafka用于海量数据的场景,通过分布式架构所提供的消息处理机制对数据进行流式存储,消费者通过订阅不同的主题可以实现毫秒级延迟的信息接收,保证数据顺序性消费。大型企业项目中可通过kafka将该框架的推送数据转发到hdfs归档或者elasticsearch做全文检测,为大型互联网设备提供高性能的数据获取途径。

4 测试结果

4.1 安全性测试

客户端输入错误的密码并向服务器发送连接请求,在已知服务器的IP地址以及运行端口号的情况下进行访问测试。测试结果如图5所示,经过rsa私钥解码显示解析后的密码与Redis中所存储的密码不符,返回错误提示。因此该推送框架可以保证服务的安全性。

图5 安全认证失败

4.2 性能测试

为检测推送服务的性能,需要对其在不同并发量情况下进行性能测试,测试采用Jmeter进行服务推送性能检测,根据并发数量以及响应时间对框架进行分析。测试设备采用两台阿里云服务器,配置见表2。

Jmeter端每间隔1 s发送一个350字节的数据,在不同的并发数情况下进行测试。如图6所示,在低并发情况下,由于系统中自身处理速度比较快,无法清楚看出MQTT与本框架的性能,但随着并发数的增长,单纯采用MQTT的响应时间上升较快,当并发数达到了2000时可以看到MQTT的响应时间已经远远超过Reactor-Netty+MQTT,同时从与Netty+MQTT的响应时间对比可以看出,该框架的响应时间也相对较低。这说明采用Reactor-Netty+MQTT的组合形式能够极大降低高并发场景下的响应时间。

图6 平均响应时间对比

从吞吐量方面考虑,图7可知当并发数达到1500左右MQTT达到了性能的瓶颈期,随着并发数的增高,服务吞吐量不再上升。而在并发数量提升到3000左右时,Netty+MQTT方案的吞吐量提升速度不如Reactor-Netty+MQTT方案快。实验结果表明在4000并发数的时候,系统仍然有较好的吞吐量,并且其上升趋势远好于原生MQTT以及Netty+MQTT方案。这说明Reactor-Netty+MQTT的方案有良好的并发性能,适用于高并发访问。

图7 平均响应速度对比

5 结束语

本文介绍了一种高性能消息推送框架,基于Reactor-Netty与MQTT相结合的方案,通过Reactor-Netty响应式编程思想对MQTT协议进行封装,实现异步高性能响应。应用Redis进行消息缓存,保证服务消息的实时性,设计kafka内容分发代理模式,可将数据推送服务与大型互联网架构相结合,实现服务数据高质量处理。经过实验验证发现,Reactor-Netty实现的消息推送服务可以提高整体的并发处理能力,相对于传统MQTT框架以及Netty框架性能有极大的提高。

猜你喜欢

消息客户端框架
框架
广义框架的不相交性
一张图看5G消息
如何看待传统媒体新闻客户端的“断舍离”?
县级台在突发事件报道中如何应用手机客户端
孵化垂直频道:新闻客户端新策略
关于原点对称的不规则Gabor框架的构造
一种基于OpenStack的云应用开发框架
消息
消息