基于MongoDB与Hadoop MapReduce的数据分析系统性能改进研究
2019-11-19杨浩
杨 浩
(榆林职业技术学院 质量管理办公室, 榆林 719000)
0 引言
MongoDB是介于关系数据库与非关系数据库之间的基于BSON(Binary Serialized Document Format,二进制串行化文档格式)格式的一种分布式文档存储系统,是一种NoSQL数据库,具有二级索引和较强的SOL性能。MongoDB以文档为单位存储数据,数据结构由键值对(key->value)组成,字段值可以是文档、数组或文档数组。
MapReduce是Hadoop的核心部件之一,是一个分布式批处理计算框架。MapReduce框架采用Master/Slave结构,master节点负责资源管理与调度, Slave节点执行分布式计算。MapReduce1.0中的master节点就是JobTracker节点,只有一个,既要进行资源管理又要进行作业调度,任务很重。JobTracker进行资源分配时根据任务数进行,而不是根据任务对资源的实际需要进行分配,容易产生内存溢出。另外,存在资源划分不合理现象,MapReduce把资源打包为slot,包括Map slot和Reduce slot,前者供Map任务调用,后者只能供Reduce任务调用,即使一方空闲,也不能被另一方调度使用。为了克服上述缺点,MapReduce2.0引入了YARN,把原来由JobTracker执行的任务进行了分解,资源分配交给了ResourceManager,由YARN专门负责,任务调度与监控交给了ApplicationMaster, MapReduce就是运行在YARN上的计算框架。
关于MongoDB和hadoop MapReduce单方面性能改进的研究较多[1-2],也有学者对MongoDB内置的MapReduce性能进行过研究,但研究结果缺乏数字表述[3-4],曾强等人通过优化MongoDB-Connector for Hadoop的内部工作机制,对其性能进行了测试评估,认为,根据输入、输出密集程度不同,分别将MongoDB和HDFS灵活配置为Hadoop MapReduce的输入源或输出目标可以取得更好的数据交换性能[5-6]。
本文剖析了MongoDB和Hadoop MapReduce的数据交换机制,在原有项目实践的基础上,总结形成了改进MongoDB与Hadoop MapReduce数据交换性能的措施,并对应用效果进行了分析论证。
1 MongoDB与MapReduce 数据传输机制
目前MongoDB与MapReduce之间数据传输主要采用如图1所示。
图1 MongoDB与MapReduce整合模式
图1主要由三部分组成,依次是MongoDB Cluster(存储节点)、MongoDB-Connector for Hadoop(连接器)以及MapReduce Cluster(计算节点)。其中,MongoDB Cluster 负责对海量数据进行分片存储,MapReduce Cluster承担并行计算,MongoDB-Connector for Hadoop是数据存取的核心组件,由MongoDB公司开发,该组件为MongoDB替代HDFS作为Hadoop计算框架MapReduce的数据存储节点提供了接口,为计算节点快速读写MongoDB集群的数据提供了保证。
MongoDB-Connector for Hadoop连接器还包括对Pig和Hive的支持,这使得非常复杂的MapReduce工作流可以通过编写更简单的脚本来执行。通过Hive访问MongoDB的方法有两种,一种是直接建立hive表和mongodb表的映射关系,通过对hive表执行HSQL查询来实现对mongodb表的查询,然后将结果返回,这种操作会导致两种问题,需要引起注意:一是如果权限设置不合理,则在Hive中删除表时,同时删除MongoDB中的对应表;二是如果MongoDB的表数据量很大,则执行HSQL操作是会变得很慢。另一种访问MongoDB的方法是采用BSON文件 。先从mongodb中导出BSON文件(mongodump),再将BSON文件写入Hive,然后用HSQL对Hive进行操作,在这种方法中序列号和反序列化操作会消耗较大的系统开销,特别是当MongoDB文档过大时,会造成网络堵塞。另外,在与Hadoop的MapReduce组件进行数据交换时,在数据分片等方面与HBASE相比需要额外开销,影响数据交换效率。
2 性能改进
2.1 设置合适的chunk size和分片方式
chunk的拆分与迁移发生在数据更改和插入操作时,频繁的splitting和balancing会消耗shard server的很多IO资源。chunk size的设置对数据迁移时的资源开销具有较大的影响。chunk size过小,shard server之间数据分布会得到很好的平衡,数据迁移速度就快,但chunk分裂频繁,会消耗掉较多的router process资源,chunk size过大,shard server之间数据分布就有可能失去平衡,chunk分裂就少,一旦有数据迁移,则量较大,会集中消耗掉更多IO资源[7,.8,9]。
chunk size默认值64M,在实际应用中,要根据具体的业务类型,设置合适的chunk size值 ,以减少系统资源开销、获得更大性能。
sharded cluster中,分片的方式有两种,一种是基于范围的分片,将集合中的数据按片值范围进行分类,另一种是基于哈希索引的分片。前者适合基于片键的范围查询,后者适合数据的随机读取与写入,所以要结合业务特征灵活选择分片方式。
2.2 合理部署MongoDB分片集群Sharded Cluster
MongoDB 的Sharded Cluster通过将数据分散存储到多个Shard上来实现性能扩展。元数据服务器Config Server 存储着Sharded Cluster的元数据,mongos进程为应用访问接口,mongos进程检测到读写请求时,从 Config Server 读取所访问数据的路由信息,并将读写请求转发到相应的 Shard 上。
集群中的mongos可任意扩展,所有的 mongos 是对等关系,用户可通过任意一个或多个mongos访问分片集群。如果将用户读取请求均匀的分散到多个 mongos 上,同时将数据分散存储到Sharded Cluster的所有shard上,可以有效实现负载均衡[10-11]。
下面是连接分片集群的代码:
MongoClientURI myconnectionString = new MongoClientURI("mongodb://:123456 @ 192.168. 250.2:3717,192.168.250.3:3717,192.168.250.4:3717/admin");
MongoClient myclient = new MongoClient (myconectionString);
MongoDatabase mydatabase = client.getData base("test");
MongoCollection
这样,系统就会自动将用户请求分散到集群中的所有 mongos 上,以实现负载均衡;另外,当某个mongos 故障时,系统自动进行故障转移,将应用请求转发到正常的 mongos 上。
config.shards组件存储着集群中所有Shard的信息,通过操作config.shards可动态的向Sharded 集群中增加或移除shard。
所以,将config.shards中分片shard的地址替换为myconnectionString中IP地址,就可以使mongos集群系统中的机器直接访问shard上的数据,从而实现数据的并发处理。
2.3 平衡CAP
CAP是NoSQL的三大理论基石之一,C(即Consistency)指的是一致性,A(即Arailability)指的是可用性,P(即Partition Tolerance)指的是分区容忍性,在同一个NoSOL系统中往往很难同时获得三个方面的最大性能,必须以牺牲一方或两方的性能来获得另一方面的最大性能。分区容忍性是基本要求,必须要有,在一般系统中很少要求强一致性,而对可用性要求较高[12,13]。鉴于此,在对MongoDB进行读写操作时,要针对具体业务,进行灵活的性能优化设置,如在读取数据时设为为了获得最大的可用性(响应时间)可以牺牲一致性等。为了提高性能,在分布式系统中往往设置为一个数据节点负责数据写入,然后把写入的数据更新至其他备份节点,而读操作往往在其它备份节点完成,但在主节点写数据成功后,在同步到其它备份节点的时候,经常由于网络故障原因出现延迟写入,这时如果要保证相关节点的数据一致性,就必须等到网络故障恢复,才能完成节点之间的数据一致性,其代价是牺牲了系统的可用性。
2.4 构建内存数据库与磁盘数据库的混合分区
MapReduce在处理数据时,先将需要的数据一次从MongonDB中读取到内存进行处理,同时把中间结果暂存在内存,待数据处理完后,将结果一次性存入内存,以节约IO开销。
但是会存在以下问题:服务器内存有限,遇到大文档数据时,内存难以一次存储所有数据;内存为半导体介质的存储器,一旦掉电或重启,内存数据全部丢失;如果有新的节点加入,无法立即对新加入的计算机(服务器)空间进行寻址。
为了避免主机掉电或系统重启带来的数据丢失问题,可以设置读数据操作在内存数据库(Memory DB)中进行,当内存数据库中找不到时,再去访问磁盘数据库(Disk DB),写数据时则直接写入磁盘数据库中,不会影响内存数据库访问速度,内存数据库定期与磁盘数据库进行数据同步,从磁盘数据库中把新写入的数据读入内存,或把内存计算结果写入磁盘数据库,这样既保证了读写速度,又不会丢失数据。
混合分区的每个分区由一个内存数据库和一个My SQL关系数据库构成,形成水平方向多个分区,垂直方向二级数据库分区(内存数据库和MySQL数据库),解决水平扩展性差的问题[14]。
另外,可以借鉴Spark计算框架的持久化技术,将经常要访问的数据缓存到内存(rdd.cache()或rdd.persist(MEMORY_ONLY)),这样当遇到动作类型的操作时,就免去了从硬盘调用数据的开销,当数据使用完毕后将内存清空(rdd.dispersist())即可。在map shuffle过程中,系统启动combin(合并)操作的阈值是3(即3个溢写文件),即当溢写文件大于等于3时启动combin操作,否则不启动combin操作。在实际应用过程中,应根据实际需要调整阈值,以提高性能。
2.5 构建有向无环图降低系统开销
在机器学习、图计算等迭代算法中,前一阶段的计算结果往往会作为下一阶段的计算输入,但在MapReduce框架中,都是把中间结果写入到存储系统,然后再根据需要调入内容,这样会造成大量的数据复制、磁盘IO和序列化开销[15]。可以参照RDD(Resilient Distributed Datasets,弹性分布式数据集)设计理念,构建GDA(Directed Acyclic Graph,有向无环图),以管道化的方式把前一个操作的处理结果转发给下一个操作作为其输入,从而避免中间结果的存储,降低数据复制、磁盘IO和序列化开销。
2.6 计算本地化
移动计算要比移动数据的开销要小的得多,所以将计算代码移动到存储数据的节点进行计算。在同一个集群中同时部署hadoop和MongoDB,使数据节点和计算机节点部署在同一个节点上,实现“计算向数据靠拢”,在数据节点上进行计算,减小数据传输与序列化开销。同时,设置MapReduce的分片(split)大小最好与shard中的chunk size值相同,以避免Map函数跨节点处理分片。
2.7 设置预测机制
当得到数据的计算节点的资源,正被其它任务占用时,是把当前节点的数据移动到其他空闲节点上进行计算,还是等待其它任务释放资源呢?需要进行计算比较,如果当前节点结束任务的时间比移动数据所花费的时间还小,则不必移动数据,等待释放资源;如果当前节点所需计算资源不能够及时释放,且数据量不大,移动数据不会带来网络堵塞,则可以移动到其它空闲节点上进行计算。
3 应用分析
上述MongoDB与Hadoop MapReduce数据交换及性能调优的改进措施是在某校学生网络舆情分析与某超市顾客消费行为分析两个项目的研究中,经过反复实践与探索形成的。学校舆情分析项目中数据集以大约8000条/天的速度增加,超市顾客消费记录以大约1万条/天的速度增加,在分析当天或近几天的数据时,由于数据量小,参数调整对系统整体性能影响不大,但是在对历史数据进行比较分析时,不同的Chunk size值、不同的MongoDB Sharded Cluster部署、不同的CAP设计等对系统性能产生了明显的影响。
在对学生年度舆情分析及超市季节性消费行为分析时,数据集均达到了300万条以上。这时将chunk size设为32MB、同时将MapReduce的split大小也设为32MB时,系统性能明显优于其它值。另外MongoDB集群要比单MongoDB性能优越,本地计算比移动数据也更优越。
系统研发过程中,对程序编写思路也进行了改进,让程序按有向无环图方式执行,以避免中间结果的存储,降低数据复制、磁盘IO和序列化开销,其效果也是非常明显的。
在进行混合分区与持久性技术实验时,系统性能并没有明显的变化,其原因是数据集不够大,因为从理论上分析,这两项改进措施对处理GB级数据时效果较明显。
以超市全年顾客季节性消费行为大数据分析为例,对此MongoDB与Hadoop MapReduce集成数据分析系统进行性能优化的前后对比关系如表1所示。
表1 优化前后的性能对比
性能改进效果通过在相同环境下执行同一任务所消耗的时间来判断,该时间为执行任务的结束时间与开始时间之差。每项优化措施在前一项优化措施的基础上进行。
项目实践时,首先将单MongoDB扩展为MongoDB集群,程序运行时间减少了2.891秒,在此基础上同步优化chunk size值和split值,程序运行减少了1.022秒,继续将移动数据改为移动计算,运行时间又减少了1.220秒,将程序按有向无环图的思想编写,运行时间减少了0.562秒,但是在利用混合分区与持久化技术进行性能优化时,未得到明显效果。程序运行节省的总时间为5.695秒。所以所采取的性能优化措施是有效的。
4 总结
对MongoDB与Hadoop MapReduce集成系统的性能改进研究具有很大的空间,本文根据项目研究的实践与探索经验,从Chunk size设置与分配方式、MongoDB分片集群部署、平衡CAP、混合分区与持久化技术、计算本地化、构建有向无环图、设置预测机制等方面进行了论述,并对这些改进措施在实践中的应用情况进行了分析。实践证明:
1) 设置chunk size值和MapReduce的split大小对系统整体性能的改进具有明显的作用。
2) 计算本地化与有向无环图技术在,数据量较大且网络性能不佳的情况下,能获得较明显效果。
3) 优化MongoDB分片集群部署、平衡CAP、混合分区在本研究中未取得较明显效果,在理论上当数据量达到GB级别时,能取得明显效果,需要后期深入研究。
但是对MongoDB与Hadoop MapReduce集成系统的性能改进,没有固定的模式,需要根据数据量的大小和计算类型采取合适的措施才能达到性能改进的目的。