支持通信数据查询分析的分布式计算系统
2014-10-31晁平复郑芷凌房俊华
晁平复, 郑芷凌, 房俊华, 张 蓉,2,
(1.华东师范大学 软件学院,上海 200062;2.华东师范大学 数据科学与工程学院,上海 200062)
0 引 言
在信息时代,信息通信与人们的生活密不可分.随着移动通信终端的普及和移动智能设备的发展,移动通信领域不再仅限于远距离通话以及信息通讯等基础业务,还包括互联网访问、智能导航、在线视频会议等一批新兴应用,响应这些应用需要高效的实时数据处理的数据管理平台的支持.
传统的移动通信过程其本质是为基于远程无线通信网络或有线网络的信号建立数据传输的过程.在通信请求发起以及数据传输的过程中,通信数据需要经过多个无线基站或有线中转站的接收与转发.为了防止数据丢包以及通信掉话等异常情况发生,每个基站会在数据传输的过程中输出大量的监测日志数据.移动运营商通过对日志记录的实时分析可以及时捕获通话异常并分析异常原因,实现对移动网络的故障监控.同时,深入分析与挖掘日志数据,能够获取通信网络中的热点区域、热点用户以及故障集中区域,可以协助优化通信服务质量.目前通信数据管理平台通常需要支持以下几个功能.
(1)基于计算的存储:支持多源海量数据依据数据集成、计算等数据处理操作,支持海量数据的实时存储.
(2)高效离线分析、挖掘:提供针对已存储历史数据的离线分析、数据挖掘的能力.
(3)实时查询:支持用户在线故障查询等实时查询业务.
(4)基于业务的数据存储自适应:支持容量的横向扩展以及业务功能的快速定制以适应业务扩展需要.
为了达到以上要求,通信数据管理平台必须在分布并行的环境下设计和实现高效的处理方案,设计和实现主要遇到以下几个方面的挑战.
(1)数据量庞大.随着移动通信用户数量的激增以及业务的丰富,系统在单位时间内会生成大量的通信日志数据(以一个中等城市为例,单日的通信日志数据量达到10TB量级).因此,高效的数据处理以及存储为平台的构建提出第1个挑战.
(2)数据结构复杂.为了更准确地监控并分析通信故障或用户行为,系统在用户通话过程中捕捉多种监控日志数据,监控数据指标繁多且结构复杂(每种监控数据含有几百到上千种不同属性字段,且数据结构非平面化).数据的结构化存储以及快速访问为平台构建提出第2个挑战.
(3)数据时序混乱.由于网络存在不稳定性,通信数据在传输过程中会出现大量的数据迟到、数据丢包等时序混乱现象.为了保证数据的正确性,对日志数据的分析需要实现对乱序数据的基于时序的集成,有限内存与长时间的通信信号缓存为数据处理提出又一挑战.
庞大的数据吞吐量、复杂的数据结构以及实时性任务需求使得数据的处理变得非常困难,但实时的通信日志存储、分析对提高通信服务质量以及创造商业价值提供了机会.因此,针对通信数据大数据管理平台的设计和实现是一项潜力巨大又充满挑战的工作.
传统的通信数据管理系统主要架构于基于硬盘的单机系统或分布式环境之上.由于数据存储与处理的能力有限,目前的数据处理方式主要是通过数据采样与统计,只将统计信息保存在数据库系统中,丢弃原始数据,通过对业务上的妥协来满足性能的要求.然而,随着近年来分布式系统的迅速发展以及内存技术的不断完善,通过分布式内存技术实现海量数据的高速处理与实时响应成为可能.
为了解决通信管理的问题,数据管理平台需要解决如何提升系统容量、处理能力、响应速度3个大问题.系统容量的要求需要实现平台的可扩展性.分布式架构为系统提供足够多的存储空间.并行处理架构为高效、快速计算、存储数据提供了解决手段.
近年来,MapReduce[1]分布式运算框架的提出,使得传统的数据处理业务可以便捷地移植到分布式运算环境中,以应对数据量迅速膨胀后单机系统的乏力.而开源Apache Hadoop[2]分布式系统的盛行,使得分布式系统作为大数据解决方案中最重要的一环[3],得到了广泛的认可.系统响应速度的要求需要实现高效计算.随着硬件技术水平的提升以及内存成本的下降,目前单台服务器在拥有大容量硬盘存储空间的同时,内存容量也得到了显著的提升.内存由于其远高于硬盘的数据访问速度以及较低的响应延迟,其特性非常适合应用在实时性要求较高的业务.然而,与相对发展较为成熟的硬盘分布式系统相比,分布式内存计算系统目前仍处在起步阶段,无论是基于RDD[4](Resilient Distributed Dataset,分布式弹性数据集)技术的分布式内存系统Spark[5,6],还是基于内存的查询引擎Impala[7]等,其对于应用场景以及内存技术的利用上都存在一定的局限性.
本文针对通信数据处理中存在的数据高吞吐量、结构复杂、时序混乱以及查询业务的高实时性、运算密集等特点,提出基于当前的分布式技术,设计和实现高效通信数据管理平台,支持数据的高效存储、近实时查询处理以及动态业务模型生成.从分布式文件存储数据处理技术的角度对当前分布式技术进行比较,包括基于硬盘的分布式系统Hadoop,分布式数据库HBase[8]、Cassandra,基于内存的分布式系统Spark,分布式查询引擎Impala.本文通过大量测试数据证明基于分布式平台与分布式计算模式结合内存技术支持大数据处理,而且能够提升实时查询的性能.
1 相关工作
随着大数据环境下数据量的迅速膨胀,近年来分布式技术发展速度较快,涌现了大量优秀的基于分布式环境的运算工具.然而,不同的分布式技术所具有的特点各不相同,在应用领域上也同样存在着差异.目前分布式工具主要分为3类:分布式计算系统、分布式SQL查询引擎以及分布式NoSQL数据库.
1.1 分布式计算系统
分布式计算系统通常基于不同的分布式运算框架进行构建,并围绕该框架提供一套完整的解决方案,包括分布式数据存储、资源分配以及任务调度.分布式计算系统具有优秀的数据批处理能力,在拥有大数据吞吐量的同时,对复杂逻辑的任务也提供了较好的支持.
目前较为流行的分布式计算系统主要由两部分组成:分布式文件系统以及分布式运算框架.其中分布式文件系统在确保数据容错的基础上,保证数据存储在分布式环境下读写的高效性以及存储的可扩展性.此外,分布式文件系统对于多种数据结构的兼容也保证了上层业务的灵活性.分布式运算框架作为数据处理过程的核心架构,其为使用者提供了一套运算模型,用户可以通过模块化编程来实现丰富的数据处理功能.系统通过对任务进行合理的资源调度与分配,实现分布式环境下高效的数据处理.目前主流的分布式系统包括Hadoop以及Spark.
基于MapReduce运算框架下的Hadoop系统,由于其极高的编程灵活性,可以支持复杂的数据处理逻辑.系统在实现多任务并发的同时,通过数据分块执行,单块故障重做的方法,以较小的代价实现了系统容错,从而保障集群中任意slave节点出现故障时不影响任务顺利执行.此 外,Hadoop 底 层 的 分 布 式 文 件 系 统 HDFS[9,10,11](Hadoop Distributed File System)通过多副本机制保证了数据的可靠性,集群的稳定性.而HDFS对多种存储格式的支持使其可以应对不同应用的需求,如列存储可以用于分析业务,行存储可以进行事务处理.HDFS凭借其良好的稳定性与兼容性成为目前使用最为广泛的分布式文件系统.
借鉴MapReduce思想,采用有向无环图框架的Spark系统,在继承了MapReduce框架编程灵活性的同时,将Map和Reduce操作拆分成了filter、groupby、join等多种数据集操作,优化了操作执行效率与内存占用.此外,Spark基于内存特性,针对任务的容错性、数据本地化、网络开销以及CPU资源利用率等方面进行了大量优化,使得目前Spark系统在多任务并发以及迭代运算等性能上相较于Hadoop有着较大的优势.
1.2 分布式SQL查询引擎
由于分布式计算系统较为自由的编程模式,在增加了系统灵活性的同时,也增加了其上手的难度,而分布式SQL查询引擎的出现解决了这样的问题.分布式查询引擎架构于分布式计算系统之上,在系统的外层增加SQL查询解析模块,将用户提交的SQL查询转换成计算系统的数据处理任务,为计算系统提供完善的SQL查询接口.由于其以分布式计算系统为基础,因此分布式查询引擎保留了优秀的可扩展性、容错性以及对复杂数据结构的支持.此外,分布式查询引擎针对SQL查询所具有的交互性特点提供了一系列的优化策略,为提高查询的实时性以及数据吞吐量起到了重要的作用.目前流行的分布式SQL查询引擎包括基于Hadoop系统的Hive、Impala以及基于Spark的Shark[12].
1.3 分布式NoSQL数据库
分布式计算系统与查询引擎虽然具有较高的数据处理能力,但由于其粗粒度且较为松散的数据组织方式以及缺乏高效索引结构等特点,因此无法很好地支持海量数据下高实时性要求的数据查询业务.另一方面,传统的关系型数据库由于其严谨的数据结构以及高效的索引技术,面对实时数据查询任务有着较强的性能优势,但其可扩展性较差,无法满足大数据量的存储与查询需求.分布式NoSQL数据库的出现解决了上述的问题.NoSQL数据库削弱了关系型数据库所具有的ACID性质,通过使用键值对存储等弱结构化数据的形式,在保留了关系型数据库索引技术等查询优化手段的同时,提升了其可扩展性以及对大数据量的支持.目前主流的分布式NoSQL数据库通常依据主键对数据进行排序或索引等方式,使得在海量记录中通过主键进行点查询时拥有非常优秀的性能.当前常用的分布式NoSQL数据库包括HBase、Cassandra等.
2 通信数据管理平台
针对通信日志的数据管理平台旨在计算和存储海量通信日志信息(每天数据量达到TB级,记录条数达到百亿);通过分析与处理日志信息,为用户提供实时的故障查询以及多维度的数据挖掘支持,从而提升通信平台的服务质量.
2.1 平台上下文分析
通信数据管理平台的上下文如下图1所示.
图1 通信数据管理平台上下文Fig.1 The context of communication data management platform
系统平台将通信网络捕获的多种异构数据收集起来,如通话日志数据、网络访问日志数据、硬件终端汇报数据等,经过管理平台的数据处理与存储,为上层数据分析与用户查询提供数据支持.
通信数据管理平台处理的3种主要数据源是:
(1)用户通话日志记录了移动终端用户在呼叫过程中的相关信息.数据由呼叫所经过的通讯中转站与通讯基站记录并输出,其中用户通话行为包括语音呼叫、简讯收发以及网络访问,日志内容包括呼叫建立、释放等信息.
(2)用户网络访问日志是用户使用无线网络数据业务时通过无线网络传输的数据,包括网页内容、视频内容等信息.
(3)移动终端测量报告是由移动终端测量并通过通讯中转站整理输出的无线测量信息,包括信号电平、通话质量、所在地区、信号干扰等信息.
这些数据存在数据schema庞大、结构层次化以及数据关联复杂等特征,给实时数据处理、存储带来非常大的挑战.原始数据记录为树形结构,且分支较多(存在可选数据结构、属性),记录属性个数不确定;此外记录内可能存在结构体数组,数据结构表现为层次化、复杂化.如何有效存储此类数据是一个大问题.同类数据、异类数据间存在较多的关联,例如有基于时序的关联、基于用户id的关联.网络带来的不稳定性,数据基于规则的关联与数据流的海量性以及处理要求的实时性产生了矛盾.
2.2 平台架构解析
根据当前通信管理平台的任务要求,平台的功能主要包含3大模块:数据处理模块、数据查询模块以及数据存储模块,如图2所示.
图2 通信数据管理平台功能模块图Fig.2 Function block diagram of communication data management platform
数据处理模块实时接收3种数据源的输入数据,依据数据源间的关联规则进行数据的关联拼接操作,并进行大量指标运算,最终将处理完成的数据以统一的数据模型存储至文件系统中,完成数据存储.当前数据处理模块主要包括以下两部分业务.
数据拼接:数据收集平台接收到3种数据,依据数据间的关联规则进行连接.通信基站会监控用户通话、短信或者网络访问行为,并输出用户通话日志;在通话日志输出的过程中,若该通话为网络访问,通信基站会随着访问网址的变化生成不同的用户网络访问日志;无论通话与否,移动终端会定时发送终端测量报告.3种数据源在处理过程中,每条用户通话日志在时间轴上均可能与多条网络访问日志与多条移动终端测量报告相关,在数据收集阶段,要求将3种数据源依据时间与用户标识进行数据关联,形成一条记录所有测量信息的完整用户通话记录.但由于可能出现时序混乱、数据丢包以及数据延迟等情况,会导致大量数据因缺少部分信息而暂时无法完成拼接的情况,因此如何处理大量因未完成拼接而暂时堆积的数据也是该模块面临的主要困难之一.
指标计算:提取已完成拼接的完整用户通话记录中部分指标(attribute)字段,依据运算规则生成新字段以及对应值.该数据处理模块在通信系统中较为常见,其意义在于将已有数据中大量无法被用户解读的参数信息进行计算,转换为可以被用户理解的指标信息.
数据查询模块接收上层用户提交的查询,并转化为对当前已存储数据的逻辑查询任务.由于上层用户查询主要为离线分析类查询与在线实时查询.因此数据查询模块也分为两部分:针对离线分析类查询的数据分析查询模块以及针对在线查询的实时定位查询模块.其中,数据分析查询模块主要针对某些列数据操作,即依据多个维度进行数据分组、汇总、排序等复杂逻辑操作,最终返回数据分析结果;实时定位查询根据用户提出的数据筛选条件,在海量数据中迅速定位到符合条件信息,并返回给用户.由于其在线交互式的特点,对于数据处理的实时性要求非常高.
数据存储模块提供高压缩率的数据序列化方案以及对复杂结构数据的存储支持,此外,由于数据业务特点的差异——数据分析业务需要提供自适应的数据列重组以支持针对特定业务的分析和查询工作.因此系统需要提供较高的列访问性能,而实时查询业务要求以单条记录的形式返回结果,需要较好的行访问效率,因此文件存储模块需要提供基于应用的数据模型动态生成支持.
3 平台技术方案设计
从上文针对数据结构以及业务模块的介绍中可以看出,通信数据管理平台无论是从数据复杂性、业务动态性上都具有很大的挑战,无法简单地通过套用其它领域的分布式解决方案来解决当前问题.因此本文根据应用的需求,利用当前流行的分布式数据处理技术来架构一套针对通信业务的分布式解决方案.
3.1 技术选型约束
平台技术方案的选型需要考虑硬件环境的约束以及数据业务的需求,当前主要的设计约束如下.
(1)硬件环境:采用由高性能服务器组成的小型集群.在早期的分布式系统中,大规模廉价机集群由于其较低廉的成本以及便于扩展的架构而得到青睐.但随着分布式技术与硬件技术的发展,廉价集群较低的性能水平、高设备维护成本以及高空间占用等问题逐渐凸显,小型高性能机集群凭借其灵巧、稳定以及卓越的性能等特点,正逐渐取代传统大型廉价机集群的地位.故本平台采用少量小型高性能服务器搭建集群.在提供了高性能并行运算能力的同时,通过配备大容量内存来提供足够的数据缓存空间,减少系统对于硬盘I/O的依赖.具体测试系统的硬件环境如下表1所示.
表1 测试环境硬件配置明细Tab.1 Hardware configuration of test environment
(2)并发性能:3种元数据通过各个通讯基站汇总并发送至数据处理平台,因此平台将以基站为单位进行数据处理.目前,单个数据平台需要同时处理接近50个通讯基站的数据,必须具备较高的并发处理能力.
(3)业务特点:当前业务分为多种类型,技术特点各异,需要一套完整的解决方案以应对不同特点的数据业务.数据处理业务要求系统拥有高并发的数据批处理能力,并保证高性能的批量数据连接操作;数据分析类查询对数据的列访问性能要求较高;实时查询则要求系统实时性较强,能够提供海量数据下的快速单点或范围查询性能.
3.2 选型方案设计
通信数据管理平台在构建过程中需要针对2个模块进行技术选型——分布式文件存储方案以及分布式管理系统.
当前分布式环境下最为通用的文件系统为Hadoop Distributed File System(HDFS).HDFS文件系统可以兼容各种存储格式以及上层的分布式框架,因此HDFS是良好的文件系统方案之一.存储格式方面,根据不同的应用要求,可以在HDFS上使用Text文档、以键值对的形式存储的序列化文件、二进制序列化存储以及各种列存储变体.常见的格式有Thrift、Parquet、RCFile[13]等.
分布式管理系统方面主要有3类方案——分布式运算框架、分布式数据库以及分布式查询引擎.分布式运算框架包括应用最为广泛的Hadoop以及基于内存有着更多优化策略的Spark;分布式数据库选择当前较为流行的HBase,分布式类SQL查询引擎包括基于硬盘的Hive以及基于内存的Shark、Impala等.
依据平台应用目标,后续内容通过对上述技术进行测试,最终选定平台构建的技术方案.
(1)文件存储方案
分布式文件系统方面由于可选方案有限,因此本系统选择HDFS作为分布式文件系统.其在稳定性与容错性上的优势可以保证业务的正常运转,而HDFS对于多数存储格式的高度兼容也为存储格式的选择提供了更广泛的空间.但由于平台在数据处理的输出以及查询业务的输入端需要对大量数据进行I/O操作,因此基于HDFS实现高数据吞吐量是本系统需要达到的一个目标.数据的存储格式方面,当前方案主要集中为3类:通过HBase进行管理的key-value对;由 Hadoop支持的键值对存储方案有Sequence File;以及包括RCFile与Parquet在内的二进制列存储方案.
表2 4种存储格式的性能比较Tab.2 Performance comparison of four storage formats
表2对比了各种存储格式在存储性能与访问效率上的对比.首先从存储空间的角度考虑,上述4种数据中,Sequence File作为非序列化方案,其空间占用相对较大.通过测试发现,由于原始数据量较大,若采用非压缩Sequence File键值存储格式,数据量将达到PB级,即使对数据进行压缩,其占用的空间也将接近集群的存储空间上限,无法满足系统的存储容量要求,因此需要一种二进制序列化的存储格式.此外,由于当前数据分析业务中存在大量列扫描操作,因此数据存储方案倾向于使用列访问性能较好的RCFile和Parquet列存储格式.当前几种二进制序列化方案中,RCFile与Parquet是两个性能相对较好的列存储方案.RCFile根据HDFS数据块的大小,首先将数据按照行切分,再将每个数据块内的完整记录按照列切分存储.作为行列混合存储方案,该方案能很好地支持行访问与列访问,而Parquet为单纯的列切分方案,但由于其良好的数据组织形式与访问性能优化,基于Parquet的列访问效率明显高于RCFile,且行访问效率劣势较小,考虑到当前业务中大量的列访问需求,Parquet在性能上具有较大优势.此外,Parquet对于通信数据中多结构体嵌套等复杂数据结构能够提供较好的支持,而通过网络传输数据时,网络传输格式Thrift可以快速便捷的转换为Parquet格式,因此将Parquet作为数据的存储格式.
(2)分布式管理系统方案
根据当前业务特点,通信数据管理平台主要面临两类系统需求,数据处理业务以及实现查询业务.由于业务逻辑较为复杂,无法通过简单的SQL语句进行表达,因此HBase等分布式数据库以及SQL查询引擎均无法支持,主要可选方案为Hadoop以及Spark.实时查询的业务逻辑较为简单,但需要在海量原始数据中,根据条件筛选出极少量记录,并实时返回查询结果.这类业务更适合使用成熟的数据库技术实现.
根据数据处理与分析查询业务的需求,将Hadoop与Spark作为主要的平台构建方案.与此同时,为了尽可能减少方案的技术复杂度,优先考虑在Hadoop或Spark上实现实时查询业务.因此对分布式管理系统的性能要求包括:秒级响应的实时性、较高的系统资源利用率、任务执行效率以及较强的并发能力.
图3 Hadoop(a)与Spark(b)实时响应能力对比Fig.3 Comparison of real-time response speed on Hadoop(a)and Spark(b)
基于通信业务中的一个经典查询,图3与图4比较了Hadoop与Spark在查询响应速度与系统资源利用率方面的优劣.图3从任务开始时,对系统资源利用率(CPU与I/O利用率)进行了监控,展示了Hadoop与Spark系统在实时响应能力上的比较.从图中可见,在任务提交后,图(a)中Hadoop系统经历了接近25 s左右的低资源利用率阶段,之后才开始任务执行过程,而图(b)中Spark的任务延迟时间极短.后续经过大量测试发现,Hadoop在任务开始时通常需要消耗20 s左右任务调度的时间,在这期间,主节点在完成任务分发与资源调配后从每个节点上开启任务,而Spark根据任务的复杂程度,需要消耗300 ms至几秒钟的任务调度时间,从实时性的角度来讲,Hadoop无法满足实时定位查询的性能要求,而Spark在实时响应上表现较好.
图4 Hadoop(a,b)与Spark(c,d)在有无数据缓存下的性能对比Fig.4 Performance comparison of Hadoop(a,b)and Spark(c,d)with or without memory cache
图4则展示了Hodoop和Spark在数据缓存在内存中以及数据在硬盘中时,查询任务执行过程与资源利用情况.通过CPU曲线与硬盘I/O曲线可以看到,当数据缓存在内存时(图4左a,c)两者的CPU资源利用率均较高,但Hadoop存在间歇性的CPU低谷,这是由于Hadoop依据MapReduce架构将数据拆分为多个Map块,并且依据CPU并发能力进行分批处理.当第1批Map接近同一时间处理完成时,由于系统需要调度第2批Map执行,因此出现了CPU暂时处在无任务状态,导致CPU低谷,影响了系统利用率.当数据存储在硬盘时(图4右b,d),Hadoop受限于硬盘I/O效率而影响了CPU的利用率,Spark则受影响较小,其原因在于Spark对本地化的数据读取的优化较好,大多数数据从HDFS中读取时,更多定向至本地磁盘数据,且在不同的RDD任务之间,数据交互也尽可能减少,从而降低了网络延迟带来的影响.从上述的实验可以发现,Spark无论是在系统执行效率还是实时性上均优于Hadoop,因此Spark能够更好地支持通信数据的查询和分析.
此外,在后续的实验中发现,虽然Spark在任务的实时性上性能较好,但作为优秀的数据批处理架构,面对实时定位查询业务需要从海量数据中实现精确定位,其效率仍不能满足需求.而HBase虽然可以提供高效的定位查询能力,但面对通信数据复杂的数据结构,简单地采用表结构存储数据显然不能满足要求.因此在最终方案里将HBase引入,用于存储数据的索引表,通过利用HBase高效的定位能力,在数十ms内完成索引的检索,并使用Spark扫描索引指向的剩余目标文件,最终实现结果的快速返回.
最终,如图5所示,通信数据管理平台的技术选型划分为两层:分布式文件存储层以HDFS为底层文件系统,Parquet作为文件存储格式,并使用Thrift文件格式进行网络通信数据的传输,此外还包含部分HBase所生成的索引表;分布式管理系统层则以Spark作为核心的分布式计算系统,并引入HBase对数据建立查询索引,提高查询效率,最终形成了一套完整的分布式解决方案.
4 平台实现与性能优化
基于当前技术选型方案,本文完整实现了通信数据管理平台的全部功能.本节将针对实现过程中的几个技术难点与解决方案进行介绍.
图5 通信数据管理平台技术框架Fig.5 The technical framework for communication data management platform
4.1 文件系统性能优化
当前在平台业务流程中涉及多次针对文件系统的读写操作,包括数据处理阶段,具体的,从文件系统中读取原始数据以及输出处理后的数据记录等;数据分析查询阶段针对大量数据的读取与扫描;以及数据实时定位查询中的数据过滤.而无论是数据处理还是数据查询,对于任务时间的要求都较为严苛,这对文件系统的I/O性能提出了较高的要求,而任务在硬盘I/O时所消耗的时间也将直接影响平台的性能.对此,针对HDFS的I/O性能进行测试以检测其是否可以满足平台的需要.
图6 单机硬盘I/O与HDFS I/O性能对比Fig.6 Performance comparison of I/O speed between single disk and HDFS
如图6所示,通过对单台高性能服务器(12块2TB SATA硬盘并行I/O)的硬盘I/O性能以及HDFS下分布式硬盘I/O性能进行比较发现,相较于单机硬盘平均700 MB/s以及2.5 GB/s的硬盘写入与读取速度,HDFS下250 MB/s的写入与600 MB/s的读取速度在性能上降低了很多,这是由于分布式文件系统中数据的备份与负载调整等额外代价所导致的,而通过对8台机器组成的HDFS集群整体进行I/O性能测试发现,系统I/O性能接近1.5 GB/s以及4 GB/s的写入与读取速度,整体性能损失明显,相较于每半个小时100 GB左右的数据读取与写入量,其较差的I/O性能一定程度上会影响业务的执行效率.为此,设计中采取以下两方面的优化.
针对大数据量的问题,尝试引入压缩方法,并针对Spark下较为通用的压缩策略进行测试,表3展示了RCFile数据分别在不采用压缩(RCFile默认采用Run Length Encoding编码算法)、采用默认GZip压缩以及采用BZip2压缩方法下压缩率以及访问时间的对比(使用基于业务的数据模型,多台中端性能PC机组建的分布式测试环境).通过实验可以得出,使用GZip压缩可以在有效缩减数据存储空间的同时提升数据的访问效率.
表3 200万条数据写入(原始数据590.9MB)压缩效率对比Tab.3 Comparison of compression efficiency on 2 million record inputs(original data size 590.9 MB)
表4 800万条数据写入(原始数据2.2GB)压缩效率对比Tab.4 Comparison of compression efficiency on 8 million record inputs(original data size 2.2 GB)
另一方面,鉴于集群系统内存空间较为充足,且Spark系统对于数据的内存缓存提供了较好的支持,设计中考虑将部分频繁使用数据缓存至内存中,以减少硬盘的访问频率.对于数据处理模块,每半个小时数据在处理过程中会出现大量因数据晚到现象而出现的无法完成拼接的残缺数据,这部分数据需要在下一个时间段时重新读取和新到达数据一起再次进行拼接,针对这部分未完成拼接的数据,可以通过将数据缓存在内存中,从而减少每个任务加载的数据量.
通过在系统实现过程中添加上述两类优化,使得每半小时的系统输入数据量从近100 GB缩减至40 GB,而数据处理模块从原始每半小时数据处理时间超过45 min缩短至20 min左右(忙闲时不同时段数据处理耗时存在差异).利用数据压缩与内存技术使得硬盘I/O方面的性能损失得到了一定程度的弥补.
4.2 实时查询性能优化
由于数据分析查询模块对于处理时间的要求较低,在实现过程中不存在严重的性能瓶颈.而实时查询模块作为用户交互式查询,其高实时性要求成为平台实现过程中的难点.实时查询的业务特点是在海量的数据记录中(通常为百亿条记录,1TB左右的数据量),通过用户ID精确定位某位用户或多位用户的通话信息(通常为几十至上百条,KB级数据量),并且需要处理多用户并发查询任务.对于这样的业务特点,使用诸如HBase这样的分布式NoSQL数据库将具有更优秀的性能,然而,使用数据库进行查询则需要将查询原始数据存入数据库.考虑到查询原始数据需要兼顾数据分析业务,因此若使用HBase进行数据存储,则需要同时满足对数据扫描的性能需求.基于上述需求,本文对分布式数据库HBase进行了性能的测试.
图7为HBase多线程扫描性能的对比.在集群平台下实验,对1.3亿条数据进行扫描,每条数据100 byte,分别返回百分之一,百分之五和百分之十的数据.结果显示HBase即使在多线程条件下,数据扫描的速度也仅限于25万条/s,对于数据分析类业务上百亿条记录的数据量,HBase的扫描性能显然无法满足需求.因此,使用HBase作为查询原始数据的存储与查询工具无法达到性能要求.
图7 HBase多线程扫描性能Fig.7 Performance of HBase multi-thread scan
另一方面,采用Spark系统进行实时查询,通过对原始数据进行并发扫描的方式获取结果,在不考虑并发查询的情况下,获得了接近10 min的查询性能.由此,本文提出了如下的优化策略:
(1)建立基于用户ID的索引.由于单一用户的通话记录在海量通话信息中分布较为稀疏,当前进行数据扫描过程,针对大部分文件的扫描均不会得到有效结果.故可以对所有记录进行离线数据扫描,并根据每一个用户ID建立针对文件的倒排表索引.当用户发出针对某个用户ID的查询时,可以通过索引确定存在该用户记录的所有文件路径,并扫描对应文件即可快速获取记录.
(2)降低文件存储粒度.当前每30 min数据存储为一个数据文件,存储粒度过大,导致单一用户ID在大多数文件中都存在数据分布.为增加基于用户ID索引的数据筛选率,系统将每个时间周期的数据拆分为多个小粒度文件,通过实验表明,随着文件粒度的降低,单一查询所涉及的数据扫描量在一定区段内呈近线性下降趋势.
(3)设计优秀的索引访问方案.考虑到查询数据以Parquet格式存储,数据结构复杂且数据量较大,无法通过分布式数据库以数据表的形式直接进行管理,因此使用“查询索引+数据扫描”的方案解决实时查询问题.数据扫描部分沿用Spark进行数据的并发扫描,而查询索引由于数据结构较为简单,数据量较小(当前用户ID数量约为6 000万左右),适合使用数据库存储,故采用分布式NoSQL数据库HBase进行索引存储.前期通过实验获知,HBase针对单条记录的查询性能在50 ms左右,并且支持多达上千的查询并发,而查询性能与原始数据库记录的数量关联不大.因此在实时查询的索引环节,查询性能非常优异.此外,由于每半小时会新增一批数据,需要对新数据添加索引,这对HBase的数据插入性能也提出了要求.据此,本文对HBase的插入性能进行测试.如表5所示,在单条记录1 KB大小的条件下,HBase的随机插入性能超过30万行/s,而实际业务中针对半小时数据的索引更新需要40 s左右的时间,性能可以满足需求.
(4)查询结果分段返回策略.当前交互式查询对性能的较高要求主要源于用户在发送查询请求后,需要长时间等待结果,用户对系统的不满程度与界面空响应时间呈指数关系.因此,尝试将最先获取的查询结果立即返回,并随着查询过程分段反馈结果也是另一种减少界面空响应时间的办法.由于当前使用的索引机制可以保证每个文件的扫描都可以确定返回查询结果,因此可以采用分段扫描数据的方式来获得第一条查询结果的快速返回.
通过上述优化措施,平台最终实现了实时定位查询性能从10 min提升至28 s的显著性能变化(其中首条查询结果返回时间为7 s),达到了30 s内完成查询的设计要求.
表5 HBase数据插入性能测试结果Tab.5 Performance of insertion on HBase
5 总 结
通信数据管理平台由于具有复杂的数据特点以及较高的数据处理与在线查询性能要求,使其无法通过单一的技术手段来满足该平台的性能需求.此外,传统基于硬盘的技术受限于硬盘I/O速度,使得交互式系统应用的实时性要求很难得到满足.而随着内存技术的发展,基于内存的分布式技术得到广泛应用,使得分布式在线查询系统成为了可能.本文通过对当今流行的内存与硬盘分布式技术进行分析与比较,采用多项开源技术融合的方式实现了通信数据管理平台的设计;并在大容量内存环境下,通过优化业务的内存缓存方案,达到了分布式环境下高实时性与高吞吐量的性能要求,为复杂的通信业务提供了一套完善的分布式内存系统解决方案.
[1] DEAN J,GHEMAWAT S.MapReduce:simplified data processing on large clusters[J].Communications of the ACM,2008,51(1):107-113.
[2] Apache Hadoop.http://hadoop.apache.org.
[3] 陈勇.基于Hadoop平台的通信数据分布式查询算法的设计与实现[D].北京:北京邮电大学,2009.
[4] ZAHARIA M,CHOWDHURY M,DAS T,et al.Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[R/OL].2011[2014-08-30].http://www.eecs.berkeley.edu/TecRpts/2011/EECS-2011-82.html.
[5] Apache Spark.http://spark.apache.org.
[6] ZAHARIA M,CHOWDHURY M,FRANKLIN M J,et al.Spark:cluster computing with working sets[R/OL].2010[2014-08-31].http://www.eecs.berkeley.edu/Pubs/TecRpts/2010/EECS-2010-53.pdf.
[7] Cloudera Impala.http://impala.io.
[8] Apache HBase.http://hbase.apache.org.
[9] SHVACHKO K,KUANG H,RADIA S,et al.The hadoop distributed file system[C]//Mass Storage Systems and Technologies(MSST),2010 IEEE26th Symposium on.IEEE,2010:1-10.
[10] SHAFER J,RIXNER S,COX A L.The hadoop distributed filesystem:Balancing portability and performance[C]//Performance Analysis of Systems &Software(ISPASS),2010 IEEE International Symposium on.IEEE,2010:122-133.
[11] BORTHAKUR D.The hadoop distributed file system:Architecture and design[EB/OL].Hadoop Project Website.2007-11-21[2014-08-30].http://hadoop.apache.org/core.
[12] ENGLE C,LUPHER A,XIN R,et al.Shark:fast data analysis using coarse-grained distributed memory[C].SIGMOD,2012:689-692.
[13] HE Y Q,LEE R B,HUAI Y,et al.RCFile:A fast and space-efficient data placement structure in MapReducebased warehouse systems[C].ICDE,2011:1199-1208.