基于Storm的实时大规模传感器监控平台的开发和实现
2019-12-12周煜敏
周煜敏 王 鹏 汪 卫
(复旦大学计算机科学技术学院 上海 200433)
0 引 言
随着云时代的到来,大数据和云计算已经吸引了越来越多的业内外人士的关注。在诸如金融、电信和大规模传感器监控等许多领域中,在线处理实时数据,也称为数据流处理也得到了越来越广泛的应用。数据流处理应用程序通常需要低延迟、快速处理和实时反馈。
在物联网的场景下,一套完整的系统由数千个传感器节点的分布式集合组成,每个传感器节点能够从环境中感测多种类型的信息并以特定频率发送数据。这就需要非常强大的分布式处理能力来满足计算的需求。
在Storm、Spark以及Flink等实时大数据计算引擎不断发展的同时,传感器的分析专家则难以掌握这些计算引擎的处理原语。因此需要用一套简单的自定义的语言支持,帮助分析专家能够简单地构建计算逻辑,这样才能够最快速地为这些专家提供可靠的数据结果。
本文在深入分析了传感器计算的需求后,发现绝大部分的计算都是基于传感器数据的数值计算,而且都是基于滑动窗口的计算,将高频的数据的统计量以秒、分钟至小时级别的频率计算,加以辅助的阈值设定和四则运算最终呈现结果。
本文试图设计一套轻量但有效的方案,以支持在大量传感器上运行的多个查询计算。由于处理是实时的,传统的静态数据平台和算法不适合传感器的流数据。同时,由于数据的高频率,还应该满足处理的高吞吐量以对应数据的摄入速率。对于传感器监测分析师,他们可以将计算需求转化为脚本,系统会自动解析并转化Storm的流处理程序,将高频数据处理之后再源源不断地产生处理结果并及时反馈给分析师。
该系统还会针对优化处理在计算的过程中出现的重复计算,衡量所涉及的通信和计算成本并通过中间结果共享的分区算法来减少网络通信已达到更好的性能。
1 相关知识
1.1 相关工作
实时计算需要一个合适的分布式平台辅助运行,社区已经开发了许多系统,例如早期的Aurora[1]、S4以及Storm[2]和Spark Streaming[3]等来处理秒级甚至毫秒级响应中的大量数据。Storm具有高度可扩展性,易于使用,并且具有低延迟和有保证的数据处理能力。同时,Storm提出了拓扑(Topology)的计算概念[4],相比于传统大数据引擎Hadoop的MapReduce更加灵活并且适合实时场景。这些特性都非常契合物联网数据处理应用的需求。与Storm不同,S4等系统无法保证每个元组都会被处理。而Spark Streaming则提出了一个新的模型,使用微批处理的方式用来近似进行分布式流处理[5],但其延迟对于实时响应来说很高,无法满足实际的应用需求。
为了在大数据流上进行计算,已经提出了许多支持复杂事件处理(CEP)的语言,包括SQL-TS[6]、Cayuga[7]等。虽然他们设计了不同的语法规则,但某些语言不适合物联网大数据的应用程序场景。在过去的研究中,也有相关的研究人员设计了一套实时处理应用辅助开发框架以简化开发人员工作[9]。而本文的工作专注于数据流上的聚合和滑动窗口计算,而上述语言和系统设计初衷是处理更复杂的流处理作业,因此在分布式集群上并没有良好的兼容性,使用这些语言会在解析子句和生成作业时会产生额外的开销。
为了实现高吞吐量,应该充分利用分布式集群。对性能的关注需要在有限的资源下完成工作。许多先前的研究已经解决了在Storm上开发的一部分优化问题。TMSH-Storm[8]有效降低了Storm的处理延迟和通信开销,然而在多查询的环境下,该方法的优势则并不明显。
之前的许多文献都讨论了物联网环境下传感器的实时处理算法,例如:基于微簇的桥梁监测数据流异常识别算法[10],主要利用主成分分析提取特征,优化异常检测的计算;基于复杂事件处理的用户需求响应性能实时监测分析,主要在复杂事件处理上实现了R算法的内嵌和可视化的仿真[11]。这些文献着重解决静态数据的上下文的处理优化而并非流处理。此外,还有一些文献作了滑动窗口相关的优化,例如:具有不同长度和不同选择谓词的滑动窗口上的聚合的多查询优化[12],然而没有利用先验知识达到更好的效果。
1.2 滑动窗口
实时查询通常适用于无界数据流,而不是静态数据集。考虑到内存限制,有必要设计用于维护流历史的摘要或概要的技术。对于大多数应用程序,数据流的最新元素比旧的元素更重要。因此这种对最近数据的偏好产生了实时流数据上的滑动窗口的表达形式。窗口的大小和滑动间隔通常使用时间间隔(基于时间)或元组数来指定(基于元组)。
在本文中滑动窗口使用时间间隔标准,时间滑动窗口W定义如下。
定义1将时间长度为Lms,每次滑动时间长度为Sms定义为滑动窗口,记作W(L,S)。
W(L,S)所对应的时间窗口如图1所示。
图1 时间滑动窗口示意图
当需要计算W(L,S)中的数据时,就需要在内存中存储时间长度为L的数据以供计算,并且每Sms就需要给出当前窗口的结果反馈。
1.3 数据格式
首先介绍一下流数据和查询的基本数据结构。每个传感器都有一个唯一的标识号。传入数据流采用(sensor_id,timestamp,data)的数据格式。它指的是在timestamp时间戳时,sensor_id对应传感器的值为data。
Timestamp的类型为long,使用的是Unix时间戳。
Data的类型包括布尔类型、整数数据和实数数据。系统会将其统一转化为单精度浮点类型进行存储和计算。
每个传感器将以静态频率发送该结构的元组。sensor_id与其频率之间的关系存储在数据库中,在优化计算时该信息会被使用。
1.4 语法规则
一位分析师可以对传感器启动一系列查询,并添加一些进一步的计算以获得检查所需的最终结果。这里将查询组称为工作流。不同的分析师希望监控不同传感器上的不同参数,从而导致许多工作流一起执行。为了消除歧义,应该为工作流提供明确的定义,并为分析师提供语言标准,为此本文设计了一套脚本语言。
本文将用户的查询划分为以下3类:
(1) 滑动窗口聚合功能;
(2) 流联合函数;
(3) 基本算术计算支持。
对于第一类,我们提供四种基本聚合功能:滑动窗口的最大值、最小值、平均值、求和运算。这4种函数每一种都需要3个参数,包括输入的sensor_id,窗口长度L和滑动间隔S。计算所产生的结果可以计为用户定义的新的流。例如:
A1=avg("S1",1 000,1 000)
该语句即代表了传感器“S1”在1秒钟的滑动窗口上的平均值,计算的结果成为一个新的流,并命名为A1。
对于第二类,本文为流提供联合函数union,其中的每个参数都是一个不同的流id,可以使用原始数据的sensor_id或者是另一个用户定义的流id。该函数还将生成一个带有新用户定义id的连接流。
对于第三类,本文提供4个算术运算(加,减,乘,除)和4个聚合函数(最大值,最小值,平均值,求和运算),接收不同的流id作为运算参数,并持续计算结果输出。与第一类计算不同,这些运算符给出结果的时间发生在任何输入参数值发生变化的时候,而不是等待窗口时间之后更新其结果。
以下脚本是工作流的另一个示例。这意味着首先计算两个传感器的平均值,并在一个10分钟的滑动窗口中以5分钟的滑动间隔计算两个传感器的连接流,然后在同一时间计算三个输出流的最大值和平均值窗口。
MD1Z=avg("8MD1-AZ",600 000,300 000);
UNI=union("8MD2-A","8MD3-A");
MD23=avg("UNI",600 000,300 000);
MD4Z=avg("8MD4-AZ",600 000,300 000);
UNIF=union("MD1Z","MD23","MD4Z");
out_MZ=max("UNIF",600 000,300 000);
out_AZ=avg("UNIF",600 000,300 000)。
2 系统架构
本节主要讲述该实时计算平台的系统架构,如图2所示。系统内部主要分为三大模块:脚本解析模块、实时计算代码生成模块和分布式实时计算模块。
图2 计算平台示意图
2.1 脚本解析模块
脚本解析模块负责解析脚本语言,提取出关键信息供后续逻辑搭建,为后续模块的查询优化提供信息。
当收到所有的脚本时,这些脚本首先通过语法分析模块生成抽象语法树,然后再通过脚本所携带的额外信息通过计算图生成模块进一步生成计算图。为了使分析计算组组之间的时间序列计算的计算能够共享,需要将计算组中的每一个语句,也就是每个计算,当成一个节点看待,而运算符和与该运算所用到的底层所有其他运算之间形成有向边,对于用户所给出的计算组进行一个有向图状的描述,从而对于相同的操作可以进行有效的合并。
本模块通过ANTLR的解析,能够识别出对于相同传感器的聚集操作,对于其中不同的窗口进行最大公约数的合并计算,以最大限度地节省不必要的计算。由于减少了一些重复互相有交集的时间片段数据存储,因此在桥梁传感器网络监测的高频率数据流的应用情景下,合并切分窗口来进行分析计算会节省不少内存消耗。
这些信息将通过信息收集模块将代码所需信息存储起来,以便后续使用。
在1.4节中描述的脚本语言简易的语法使得以前需要使用成千上万行的Storm代码才能完成的查询逻辑,只需要几十条语句便可以完成。当用户遇到查询需求变更的时候,用户只需要把简短的几十条语句作轻微的修改,然后由实时计算代码生成模块重新生成可以执行的Storm代码,进行部署和运行。
2.2 实时计算代码生成模块
实时计算代码生成模块通过Java反射机制,根据用户的脚本需求,将计算图进一步转化为Storm的Bolt具体的处理代码。
上一模块产生的计算图的结果,分别经过代码生成模块和优化分区结果生成模块的解析、相应的源代码和分区结果。
其中代码生成模块主要运用Java语言的反射机制,将计算图的逻辑转换成相对应的Java函数,并配置对应的参数。而每一种函数都对应一段具体的Storm原语的计算逻辑。
对于优化分区结果生成模块而言,由于多个脚本之间存在重复的查询语句,因此代码生成模块中还包含了查询共享发现模块。该模块负责把脚本中存在的重复查询语句组进行合并去重,减少数据流元组在网络中的重复传输以及在集群中的重复计算。本文采用的查询共享模块的算法架构和具体实现如下:
对于一个分布式集群,如果所有的计算能够相对平均的分配到每一个计算节点上去,那么集群的计算能力就能够得到最大程度的发挥,计算的吞吐量也得以提升。而实际情况中,在一个工作流中通常存在对于相同的流的计算的情况,如果这些计算能够分区在一起将会共享计算结果,减少重复的计算,从而获得更好的性能。同时,如果两个不同的工作流程共享同一个传感器计算或甚至相同的窗口聚集计算,那将这两个工作流程合并在一起也能降低通信成本,从而提高性能。
基于以上想法,采用启发式算法进行分区优化见算法1。
算法1分区优化算法
输入:分布式工作节点个数n,计算图G,数据库中存储的传感器数据频率Freq[]
输出:每个传感器的分区结果Map:Partition
for i :=1 to n
W[i] :=0
//工作节点负载
foreach G的子图G’
load[G’] := 0
foreach G’中所有传感器sensor_id
load[G’] :=load[G’]+freq[sensor_id]
Arrays.sort(load)
foreach G的子图G’(按load从大到小)
target :=W数组中最小值下标
foreach G’中所有传感器sensor_id
Partition.put(sensor_id,target)
算法首先将每一个结点的计算复杂度作为该节点的权重,然后以子图的粒度进行权重的计算。获取划分算法之后,通过加权轮询算法判断子图和分区的对应关系,以达到负载均衡。
2.3 分布式实时计算框架
分布式实时计算通过分布式Storm集群实现,其Storm的拓扑结构如图3所示。
图3 Storm计算拓扑示意图
在Storm拓扑中,数据源模块将持续发送原始传感器数据的元组,在计算之前还需要一层Filter Bolt以过滤无关传感器的流式数据。在查询组中所设计到的要处理的数据种类是有限的,正如桥梁检测系统中的原始数据通道可能有几千个,而查询组中涉及到的通道却有可能只有非常少量的部分。因此本文增加了这一层过滤模块,将不必要的传感器数据从系统中过滤出去,有效减少了整个分布式系统的负载和计算压力。
数据流中的元组从一个组件发往另外一个组件需要指定发送的分组方式,默认的随机分组并不能有效地解决大规模传感器场景下数据不均衡所带来的性能压力。2.2节所述的分区优化算法帮助系统产生了负载更均衡的数据分区对应关系。我们利用了Storm系统提供的CustomGrouping API,将优化算法输出的Map
Calc Bolt接收代码生成模块传递的参数和代码,能够保证运算严格按照用户脚本所定义的窗口运行。而上游的优化分组策略能够进一步降低网络传输的代价,提升整个系统的计算效率。
Result Bolt和Calc Bolt的原理很类似,它会接收Calc Bolt的计算结果,以相对较低的负载完成上层结果的计算并在命令行进行输出。不同的是,Result Bolt的计算在接收数据的瞬间触发而非等待窗口到达。
本系统结合物联网传感器计算的窗口聚合计算占比较大,计算同质性较大等特点,完成了一个基于匹配的查询优化算法,使得对于海量流式数据在分布式系统中的处理更加平衡,从而节约资源,提高查询效率和性能。
3 实 验
3.1 测试集群
实验环境使用由1个Master和4个Slave组成的Storm集群。每个节点都有64 GB内存,6×2.0 GHz(Intel Xeon E5-2620)CPU和6 TB磁盘空间。所有节点都通过1 GB以太网连接。
3.2 数据集
实验使用真实的上海市的大桥传感器数据,传感器种类达21种,总数量达到了1 000,其中大多数传感器的数据传输速率达到了20 Hz以上,每秒钟传输的数据量约30 MB。数据通过传感器采集系统的加密socket协议进入系统。
工作流程由经验丰富的工程师设计,因此监控结果在实际应用中具有重要意义。此外,工程师来自不同的领域,包括一些交叉领域。不同类型的传感器以复杂的方式使用。本文从工程师那里收集了1 024个不同的工作流程。
3.3 实验结果及分析
通过套接字获取数据源并将其放入Apache Kafka[13]作为Spout的数据生成器。作为对比实验,本文在sensor_id上使用fieldGrouping来对所有数据进行分区(图例中naive算法),而Storm的拓扑结构保持不变。这样,所有的传感器会以随机的方式分配到不同的计算节点上进行运算,可以有效地检验本文所述优化和算法的有效性。
本文使用两个性能指标:通信成本和节省的代码行。通信成本通过每分钟Filter Bolt与Calc Bolt任务之间传递的数据单元的数量来衡量。节省的代码行是将脚本语言的行数和直接在Storm上编写代码执行所有流处理逻辑的代码行数的比较,该参数可以直观地测量为传感器监测专家节省的工作量。
本文使用两个评估参数:计算涉及传感器的数量n和计算中共享的传感器的数量ns。n可以反映工作量的复杂性,而ns可以反映可以共享的数据量,也就是整个计算图的连接性。
系统的通信成本远低于对比方法,在各种计算量的实验中,平均提高了20%,并且特别在大计算量的实验中更显著(见图4)。这是因为当计算量更大时,会存在更多的重复计算的优化空间,证明了分区算法的优势。
图4 网络传输量对比实验结果
本文使用16个相同数据发送频率的传感器进行了另一组脚本实验,并且构造了相同的计算逻辑。唯一的区别是本文改变不同的sensor_id以获得不同的ns来改变数据计算的可共享性。当ns增加时,系统的表现远远好于对比算法(参见图5),实验表明本文算法更好地利用了可利用的先验知识让计算尽可能在本地进行。可以看出,本系统在共享8个传感器的计算中,达到最多20%的网络传输减少,进一步体现了本文算法的有效性。
图5 网络传输量和计算可共享传感器数量关系实验结果
另外本文测试了直接编写Java代码来直接实现计算逻辑,并和精简的脚本语言进行比较。实验发现:如果代码不是由系统自动生成的,将会需要完成大量的重复编码工作。从表1中看出,尤其是当查询数量很大时,脚本行数相对于Java的代码行数有了极大的减少,这说明了本文提供的脚本语言为传感器监控专家节省了大量的精力。
表1 脚本优化情况比较
4 结 语
本文提出了大规模传感器流数据中的实时聚合计算框架的方法。主要贡献是提供了适合这一类计算的简单易用的脚本语言和相应的分布式计算系统。脚本语言使分析人员能够在滑动窗口的聚合的组合中构建自己的计算逻辑。同时,该系统平台可以将脚本语言解释为Storm拓扑,使用智能计算和分组方法来显著提高性能。实验证明了本文所述的系统使得传感器监控专家从编码工作中解脱,在计算大规模传感器的应用中降低了流处理的通信成本,从而能够在分布式环境中处理大量的查询。