APP下载

基于MapReduce的内存并行Join算法研究

2016-08-05许胤龙

计算机应用与软件 2016年7期
关键词:键值线程内存

李 成 许胤龙 郭 帆 吴 思

(中国科学技术大学计算机科学与技术学院 安徽 合肥 230027) (安徽省高性能计算重点实验室 安徽 合肥 230027)



基于MapReduce的内存并行Join算法研究

李成许胤龙郭帆吴思

(中国科学技术大学计算机科学与技术学院安徽 合肥 230027) (安徽省高性能计算重点实验室安徽 合肥 230027)

摘要传统的并行Join算法缺少必要的容错能力,且数据划分不均往往导致单个线程的阻塞成为整个任务执行的瓶颈。针对以上问题,分析内存连接的各个阶段对Join算法性能的影响,提出一种可利用MapReduce的动态机制,避免了传统并行连接算法的数据任务分派不均和容错问题。算法使用MapReduce编程框架,并通过封装分块标记减少MapReduce Join执行过程中标记和排序的计算开销,使算法性能显著提高。实验结果表明,该算法在共享内存体系结构下,性能上相比已有算法有显著改进。

关键词内存连接数据封装MapReduce

0引言

当前,随着大数据时代的来临,MapReduce由于其具有良好的可扩展性和容错性,已经被广泛应用于面向数据处理的应用中。MapReduce最初是由谷歌工程师Dean等人在2004年推出[1],其最初的设计目的是处理公司大规模的网络日志数据访问。MapReduce编程模式通过提供一种简单的编程接口实现在普通机器上的并行和大规模分布式计算,并能将并行数据处理中容错、负载均衡和数据分布的复杂细节隐藏起来,自动完成[2]。

Join 算法是进行两个或者多个数据集聚集连接的操作,是面向数据处理应用的常用算法。优化Join执行的效率,可以有效提升数据分析任务的性能。Join在分布式环境下使用MapReduce并行化的研究成果相对丰富[3-5],但针对内存共享环境下,使用MapReduce对Join算法进行并行化的研究却仍十分少见[10]。然而,随着多核处理器的普及和内存数据库的流行,研究MapReduce在共享内存环境下实现内存Join算法的并行具有重要意义。

传统的内存Join算法,研究内容主要集中在均摊并行处理数据的多个线程的任务和优化cache访问两个方向。其中,一种流行的最新技术是使用Radix-Cluster Hash Join[9]。但是,针对内存多核体系结构,当有多个线程并行处理已经存放到内存的数据时,直接使用以上策略并不能充分发挥其性能。传统并行方式通过多个线程并行执行任务子集,将带来单个线程任务过大时会成为整个查询事务的瓶颈,特别是单个线程查询失败将导致整个查询失败。

本文提出通过引入MapReduce的机制解决传统并行Join算法单个线程成为算法瓶颈或者导致整个任务失败的问题。在标准MapReduce Join算法的基础上,结合多核体系架构的特性,提出了基于MapReduce的 Radix Join优化算法。在该算法中考虑了cache 命中率和MapReduce执行过程中数据分片规模对算法的影响,在减少中间结果规模的同时,保证算法具有良好的cache 敏感特性。在CMP和SMP环境下的实验结果表明,该算法无论是对比传统内存共享并行Join算法还是常用的标准MapReduce Join算法,性能均具有较大提升。

1内存Join算法

内存Join算法优化的研究众多[6-8],并提出了多种针对不同情形下的优秀算法,其中,Radix Join 便是针对等值Join的突出代表。下面将介绍该算法串行及其并行算法的执行过程。

1.1Radix Join

Balkesen等[9]证实了当哈希表大于cache的大小时,几乎每个访问都导致一次cache访问缺失。因此,切分哈希表,使每个哈希表的大小能够小于cache的大小,可以提升系统性能。Albutiu等[7]借鉴该思想,通过考虑传输后备缓冲器(TLB)对性能的影响,提出了多次划分的算法思想。现在该思想已经成为Radix Join算法的标准组成。

完整的Radix Join说明如图1所示。两个输入都是通过使用两次Radix数据划分的方式划分到合适的大小。每个ri由基于哈希划分输入R得到, ri会根据哈希函数进行第二次划分。所有的sj划分的分区会被遍历并与ri所划分成的哈希子表中的表项进行连接匹配。在Radix Join中,为了取得良好的cache特性,避免一次过多的数据片划分产生,两个输入表都需要经过多段的数据划分处理。

图1 Radix Join执行过程[7]

1.2并行Radix Join

对于通过将划分过程中产生的数据子集由相互独立的多个线程并行执行,串行的Radix Join 算法可以实现算法的并行化[8]。在第一阶段,由单独的线程划分数据,并对于每个线程将会产生自己专用的部分数据的数据区域[7]。在第一步数据划分完成以后,各个任务已经具有足够的独立性,可以很好地并行完成各自的工作。线程工作的任务分发通过任务队列实现。通过以上方法,对于一个p核系统,该算法的时间复杂度可以期望为单核的1/p。

上述算法在数据均匀分布时具有较好的并行特性。但是,算法在进行数据划分时,很可能导致划分数据的失衡,从而导致在并行执行阶段中,数据处理时间最长的线程成为整个任务的瓶颈。更重要的是,在并行执行阶段,一旦某个线程处理出了问题,将会导致整个查询任务的失败。

2MapReduce Join算法及其优化

MapReduce的动态调度机制和容错机制,可以很好地解决传统并行内存连接算法的问题。根据MapReduce的特征,MapReduce Join算法可以有两类实现:Map-side Join和Reduce-side Join[11]。由于Map-side Join算法要求数据是有序的[11],因此,本文只关注适用范围更广的Reduce-side Join。

2.1朴素的MapReduce Join算法

如图2所示,Reduce-side Join 算法将输入数据的表项通过Map函数产生中间数据。为了区分R表与S表的表项,通过使用添加标签的方式,产生对应的键值对。标记的键值对以进行连接的项作为键。Map函数的输出将按键的值进行排序。所有的具有相同键的数据会被划归为一组,交由一个Reducer处理。执行过程如算法1所描述。

图2 Reduce-side Join 数据流[11]

算法1朴素的MapReduce Join算法

Require: Input relations R and S for Join operations

1.Map(Key k, Value v)

//map 阶段

2. if(v comes from R)

//标记R表和S表表项

3. tag=1 and Join_key=v.a

4. else

5. tag=2 and Join_key=v.b

6. Output.collect( Join_key, T+tag)

//输出带标签键值对

7.end Map

8.Reduce( key key,List values)

//reduce 阶段

9. for each v in values

10. if(v.tag=1)

//根据标记将数据加入相应数据集

11. add v to ArrayList_R

12. else

13. add v to ArrayList_S

14. for each val1 in ArrayList_R

15. for each val2 in ArrayList_S

16. result=val1 Join val2

//执行join

17. collect(key, result)

18.end reduce

MapReduce的引入,在解决传统并行Radix Join算法问题的同时,也带来了新的挑战。由于在键值对生成过程中,需要以添加标签的方式,让Reducer区分是R表还是S表的表项,使得添加的标签处理太多。同时,标准MapReduce编程框架要求中间结果将按键值进行排序,需要排序的数据规模太大,将严重影响算法的执行性能。另外,为了保持Radix的cache特性,数据的最终划分也需要合理的选择。因此,本文提出了一种新的改进方法。

2.2MapReduce Join算法优化

在上文中介绍了朴素的MapReduce Join算法的实现及其执行过程,并提出了内存Join算法在使用MapReduce框架时带来的挑战。

在多核体系架构下,朴素的MapReduce的算法设计暴露出其弊端。内存和数据的访问执行是通过如图3所示的体系完成的。基于该体系架构内的通信代价几乎是可以忽略的,但是MapReduce标准执行中的标记及排序操作将成为算法的主要开销。分布式MapReduce环境下的Join算法优化,大多关注于网络通信代价的优化,缺少对于内存共享环境下,结合计算机多核特性的Join算法的深入研究。基于以上原因,本文将分析Join 算法的执行过程,从而提出并实现适合内存共享环境下的MapReduce Join算法。针对如图4所示的数据流,根据算法2的执行过程,分别对Map和Reduce两个阶段对算法进行优化。

图3 三层cache的多核体系结构

图4 内存MapReduce Join的数据流

算法2改进的MapReduce Join算法

Require: Input relations R and S for Join operations

1.Map(Key k, Values Vs)

//map 阶段

2. /*使用Radix hash进行第一次划分后封装*/

3. Use the hash1 to split the Vs into blocks Ts

4. if(T comes from R)

//标记R表和S表数据块

5. tag=1 Join_key=hash1(T.a)

6. else

7. tag=2 Join_key=hash1(T.b)

8. Output.collect( Join_key, T+tag)

//输出带标签键值对

9.end map

10.reduce (Key k’, List blocks)

//Reduce 阶段

11. for each T in blocks

12. if(T.tag=1)

//根据标记将数据块对应合并

13. add T to ArrayList_R

14. else

15. add T to ArrayList_S

16. /*使用Radix hash对两个数据集进行第二次划分*/

17. split ArrayList_R with hasp into ArrayList_R’

18. split ArrayList_R with hasp into ArrayList_S’

19. for each val1 in ArrayList_R’

20. for each val2 in ArrayList_S’

21. do result=val1 Join val2

//执行join

22. collect(key, result)

23.end reduce

2.2.1Map阶段优化

在数据划分之后,相互独立的Map任务并行处理分配给自己的数据。各个Map任务通过使用添加标签的方式,对数据表中的每个进行连接的表项进行处理,并产生对应的键值对。在键值对生成过程中,键值对将按键值进行插入排序。由于对单个键值对添加标签,使得添加的标签处理太多,并且,中间需要排序的数据规模太大,严重影响算法的执行性能,本文将对此进行改进。

针对Map阶段的优化,本文使用封装标记法减少算法执行过程中的计算开销。因为在Map阶段需要将所有数据是来自R表还是S表进行标记,针对每个表项进行标记执行代价太高。由于面向的是等值连接,可以使用哈希的方式进行初次的数据划分。如算法2的第3~5行描述,划分后的数据块封装成一个整体,将该数据块的哈希值作为键,包含有封装数据地址的数据结构作为值,生成键值对。通过这种封装的方式将每个键值对的标记将是对每个数据集进行整体标记,减少了大量标记操作。由于中间的结果数据需要进行排序,适当的封装同时也减少了中间结果需要排序的数量。

为了具备良好的cache特性,算法利用Radix Hash函数对数据进行划分。但是对于大规模数据而言(GB级以上),如果Radix哈希的划分分片太少,将不能充分地发挥MapReduce动态调度的优势。可能导致数据分配不均衡,使得单个Reducer任务成为瓶颈而无法充分发挥并行能力。为了解决此问题,本文通过多次划分的方式解决。在Map阶段进行适当规模的数据划分,初次划分的规模,经过实验验证,如图5所示。每次数据划分使用一个字节进行划分,这样产生256个分组,将得到性能最接近最优的划分。以字节的方式划分,可以通过整个字节截取的方式,在减少了中间数据存储开销的同时,也使得一次可以有更多的数据放入cache中,提高cache命中率。对于任务可能的分布不均,将对数据规模超过参数限制(本文中为标准值8倍)的数据块进行多一次的划分。

图5 Radix哈希分块使用位数变化对性能的影响

2.2.2Reduce阶段优化

本文不仅关注Map执行阶段的优化,还关注Reduce阶段的优化。Reducers等待所有的Map任务的返回结果。中间结果中针对每个hash值的数据,都会调用一次Reduce 任务。

每个Reduce任务负责处理多个Reduce数据块(如图4所示)。每个Map任务处理部分的数据,Reducer归并所有部分的数据放在合适的缓存中。Reducer需要计算所有具有相同键值的中间值,并输出数据最终的处理结果。正如本文前面介绍的Radix Join所示,需要设计合理的调度策略来优化数据的访问性能。

如Map阶段所描述,为了减少中间结果排序时间等,将Map阶段划分的数据块数控制在一个较小的规模。这使得每个Reduce任务需要处理的数据规模过大。本文通过对数据进行二次划分,并将数据切分到可以容纳至最低cache层,进行对多个数据集的并行执行。

为了最优化内存驻留的算法,选择一个简单并且合适的内存访问体系模型将非常重要。尽管本文选用的多核结构的内存体系(如图3所示),相对于真实的内存体系稍微简单,但其是不同平台共有的基本框架[12]。本文的算法是cache敏感的,其优化主要针对第一层cache,也就是L1层的cache优化。因为对于不同的平台,高层的cache多有变化,而上层的cache访问往往不会影响最低层的cache访问性能,这使得本文的研究具有普遍的适用性。本文将进行连接的最终数据规模划分到不能超过L1层的cache大小,这样既避免了最低层的cache震荡,还可以适度地优化高层的cache命中率。

3实验及结果分析

为了评价本文的算法,我们在不同环境下实现了文中所提到的朴素算法和改进算法,并对比已有的传统Radix Join算法并行方法。实验运行在一个具有16核的 Intel Xeon SMP 硬件系统上,操作系统为64位版本的CentOS Linux 6.4。

3.1实验数据

为了更好地对比,本文采用的数据集与文献[8]中使用的数据集相似。输入的数据全部是整数,特别是已有算法的假设数据场景是面向列式存储的,所以我们将数据表执行定义为每行仅是键值对的表项,每条记录长度为16 B,键和值的数据类型是8个字节的长整数。实验所测试的数据R表与S表数据规模相等,由于硬件条件的限制,为了防止数据刷到虚拟内存,影响算法性能的测量, 最大数据集为两个表各5亿条记录,共16 GB。

3.2实验设置

本文通过机器和任务参数配置来模拟不同场景下的内存Join算法。使用不同大小的数据输入表进行不同数据规模下的性能对比。实验结果显示,本文的算法性能在所有测试情况下都优于其他的算法。

在前两组实验中,依次在CMP(8核)和SMP(16核)环境下固定核数,使用数据规模为1、2、4、8、16 GB的不同规模输入的数据集进行实验。最后的一组实验将评估本文算法随核数变化的可扩展性。通过固定R表与S表规模(共1 GB条记录),改变处理数据使用的核数,评估算法对于处理器核数变化的可扩展性方面的性能。

3.3实验结果及分析

图6展现了在固定核数的情况下,本文算法与已有相关算法在CMP环境下随着数据规模输入的增大,执行时间的变化。由图可知,在各个数据集下,改进的MapReduce join 算法均优于标准的Radix Join 并行化实现,而朴素的MapReduce Join算法性能最差。随着实验数据规模的增大,各个算法的执行时间都有显著的增大。并且随着数据规模的增大,改进算法相对标准并行Radix Join和朴素MapReduce Join性能提升更加明显,由1 GB数据规模的性能分别提升了28.1%和77.3%,到16 GB数据规模的性能分别提升了46.7%和77.9%。这是因为随着数据规模的增加,MapReduce动态调度更能突出其优势,而朴素的MapReduce Join算法因为大量地添加标签操作以及中间数据排序操作花费了太多时间。实验结果表明,将原分布式环境下MapReduce编程模型简单搬到内存共享环境下并不能取得突出的性能表现,需要根据环境特征重新设计算法,才能取得良好的性能。

图6 Join算法在8核CMP系统上的性能对比

图7展示了SMP环境下(16核)处理与CMP(8核)环境下相同数据集的各个算法的处理时间。由图可知,各算法在16核环境下执行时间都有不同程度的减少,但在SMP环境下,改进MapReduce Join算法相对其他两种算法的性能仍然有很大提升。以实验数据1 GB的第一组实验为例,改进的MapReduce Join的执行时间由CMP环境下的0.983 s下降到0.6196 s,相对于标准并行Radix Join和朴素MapReduce Join性能分别提升了26.9%和76.6%。相对CMP环境下提升虽然略有下降,但基本上和CMP(8核)环境下取得了一致的结果。

图7 Join算法在16核SMP系统上的性能对比

图8展示了对于同一数据集,各个算法随着计算核数变化的执行时间变化。该扩展性测试显示,各个算法随着核数的增加,执行时间逐渐减少,而本文所提出算法的执行时间随核数增加而下降最为迅速。因为随着实际使用核数的增加,将会有更多的线程同时在共享数据的情况下进行数据处理,使得每个Map任务或者Reduce任务处理的数据规模减少。在单核环境下,虽然算法的执行时间稍逊色于标准并行Radix Join,但当核数增多后,由于并行处理数据划分等原因,改进的MapReduce Join算法表现的性能开始超过标准并行Radix Join。并且最终随着核数的增加,算法性能的提升呈现保持的趋势。图8展示的对于扩展性能的测试结果验证了改进后的MapReduce Join算法不仅具有高效性,还具有良好的可扩展性。

图8 Join算法扩展性测试

4结语

本文提出一种新的内存Join算法,且该算法在多核共享内存体系结构下可以取得高效性能。该算法借助MapReduce编程框架,并利用Radix算法的特性,在标准实现上加以改进,解决了传统并行Join算法单个线程阻塞成为整个任务瓶颈以及缺少容错性的问题。通过在Map阶段划分后封装减少中间结果数据规模,解决了因引入MapReduce方式带来中间结果标记和排序开销过大的问题,使得算法在具有了MapReduce良好容错性的同时,具有高效性。新的MapReduce Join算法在多核内存共享环境下,相对于原有算法,在计算性能和良好的可扩展性方面均具有突出的优势。

参考文献

[1] Dean J,Ghemawat S.MapReduce:simplified data processing on large clusters[J].Communications of the ACM,2008,51(1):107-113.

[2] Ranger C,Raghuraman R,Penmetsa A,et al.Evaluating MapReduce for multi-core and multiprocessor systems[C]//High Performance Computer Architecture,2007.HPCA 2007.IEEE 13th International Symposium on.IEEE,2007:13-24.

[3] Afrati F N,Ullman J D.Optimizing Joins in a Map-Reduce environment[C]//Proceedings of the 13th International Conference on Extending Database Technology.ACM,2010:99-110.

[4] Blanas S,Patel J M,Ercegovac V,et al.A comparison of Join algorithms for log processing in MapReduce[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of data.ACM,2010:975-986.

[5] Afrati F N,Ullman J D.Optimizing multiway Joins in a Map-Reduce environment[J].Knowledge and Data Engineering,IEEE Transactions on,2011,23(9):1282-1298.

[6] Blanas S,Li Y,Patel J M.Design and evaluation of main memory hash Join algorithms for multi-core CPUs[C]//Proceedings of the 2011 ACM SIGMOD International Conference on Management of data.ACM,2011:37-48.

[7] Albutiu M C,Kemper A,Neumann T.Massively parallel sort-merge Joins in main memory multi-core database systems[J].Proceedings of the VLDB Endowment,2012,5(10):1064-1075.

[8] Balkesen C,Teubner J,Alonso G,et al.Main-Memory Hash Joins on Modern Processor Architectures[J].Knowledge and Data Engineering,IEEE Transactions on,2014,26(3):99-113.

[9] Balkesen C,Teubner J,Alonso G,et al.Main-memory hash Joins on multi-core CPUs: Tuning to the underlying hardware[C]//Data Engineering (ICDE),2013 IEEE 29th International Conference on.IEEE,2013:362-373.

[10] Jiang W,Ravi V T,Agrawal G.A map-reduce system with an alternate api for multi-core environments[C]//Proc of the 10th Int Conf on Cluster,Cloud,and Grid Computing.IEEE,2010:84-93.

[11] Jadhav V,Aghav J,Dorwani S.Join Algorithms Using MapReduce:A Survey[C]//International Conference on Electrical Engineering and Computer Science,21st.2013.

[12] Boncz P A,Manegold S,Kersten M L.Database architecture optimized for the new bottleneck:Memory access[C]//VLDB,1999,99:54-65.

收稿日期:2015-02-11。李成,硕士生,主研领域:数据库优化查询。许胤龙,教授。郭帆,硕士生。吴思,博士生。

中图分类号TP3

文献标识码A

DOI:10.3969/j.issn.1000-386x.2016.07.059

RESEARCH ON MAPREDUCE-BASED IN-MEMORY PARALLEL JOIN ALGORITHM

Li ChengXu YinlongGuo FanWu Si

(SchoolofComputerScienceandTechnology,UniversityofScienceandTechnologyofChina,Hefei230027,Anhui,China) (TheKeyLaboratoryonHighPerformanceComputing,Hefei230027,Anhui,China)

AbstractTraditional parallel Join algorithms lack the necessary fault tolerance capability, and data partitioning inequality often leads to a single thread obstruction which in turn becomes the bottleneck of the whole task execution. In light of the above problem, this paper dissects the influence of each phase of in-memory join on the performance of Join algorithm, and proposes a dynamic mechanism in which the MapReduce is applicable, thus avoids the problems of traditional parallel Join algorithm implementation in unequal data tasks allocation and fault tolerance. The algorithm uses MapReduce programming framework, and reduces the computational cost of tagging and ranking in execution process of MapReduce Join through encapsulating the blocking tags, this makes the performance of the algorithm improve remarkably. Experimental results show that this algorithm has evident improvement in performance for shared-memory architecture.

KeywordsIn-memory JoinData encapsulationMapReduce

猜你喜欢

键值线程内存
基于C#线程实验探究
非请勿进 为注册表的重要键值上把“锁”
基于国产化环境的线程池模型研究与实现
“春夏秋冬”的内存
一键直达 Windows 10注册表编辑高招
浅谈linux多线程协作
内存搭配DDR4、DDR3L还是DDR3?
基于内存的地理信息访问技术
注册表值被删除导致文件夹选项成空白
上网本为什么只有1GB?