MapReduce中Combine优化机制的利用
2013-04-29贾欧阳阮树骅田兴杨峻兴李丹
贾欧阳 阮树骅 田兴 杨峻兴 李丹
摘 要: 由Apache软件基金会开发的Hadoop分布式系统基础架构,作为一个主流的云计算平台,其核心框架之一的MapReduce性能已经成为一个研究热点,其中对于Shuffle阶段的优化,使用Combine优化机制是关键。文章详细介绍了MapReduce计算框架及Shuffle流程;分别从机理简介、执行时机、运行条件三方面详细阐述了如何利用Combine优化机制;通过搭建Hadoop集群,运用MapReduce分布式算法测试实验数据。实验结果充分证明,正确地运用Combine优化机制能显著提高MapReduce框架的性能。
关键词: 云计算; Hadoop; MapReduce; Shuffle; Combine
中图分类号:TP393.2 文献标志码:A 文章编号:1006-8228(2013)09-01-04
0 引言
我们正处在一个数据爆炸的时代,数据生成速度之快令人惊讶:纽约证券交易所每天产生1TB的数据,Facebook存储着约100亿张照片占用了约1PB存储容量[1]。由Apache软件基金会开发的Hadoop分布式系统基础架构,正是为了解决海量数据的存储和计算问题,并且由于其高可靠性、高可扩展性、高效性和高容错性,已经成为一种主流的云计算平台[2]。Hadoop核心框架之一的MapReduce,在性能优化和提高等方面的问题已经被学术界和工业界所关注,而其中很重要的一部分是对Shuffle阶段的优化。本文详细介绍了MapReduce框架和Shuffle阶段流程,研究分析了Shuffle优化过程中利用Combine优化机制存在的问题,通过实验和理论分析找出了解决方案,提出了Combine优化机制的执行时机和运行条件,并利用实例数据充分证明了正确地利用Combine优化机制能显著提高MapReduce框架性能。
1 MapReduce框架
1.1 框架简介
MapReduce是一种能够在普通配置计算机上并行处理大量数据的并行计算框架,使用这个框架,我们只需要编写我们想要执行的简单运算即可,写出的程序自动在集群上并行执行,而不必关心集群之间的调度、数据分布、负载均衡、容灾备份、通信等复杂细节,这些问题都已经被MapReduce框架封装好了。
MapReduce可以让没有任何分布式、并行编程经验的程序员很容易地利用分布式系统的资源[3],用户只需要自己定义map函数和reduce函数,这两个函数的灵感来自Lisp和许多其他函数式语言的map和reduce原语[4],map函数从输入数据中获取键/值对同时生成一个中间键值对集合,reduce函数合并有相同中间键的中间值,从而得到一个想要的结果。MapReduce框架会在任务运行时关注调度任务,并监视任务的执行状况,如果执行失败,将重新执行该任务[5]。
1.2 实现机制
首先输入文件会被用户程序调用的MapReduce库切分成若干64MB大小的数据块,这些数据块在框架中会被自动地创建副本,然后调用fork原语创建数个子程序,在这些子程序中有一个特殊程序master,其他程序都被当作worker程序,一个空闲的worker程序将收到由master分配的一个map任务和reduce任务。
worker程序从输入的数据片段中解析出键值对,然后这些键值对被传递给用户自定义实现的map函数,由map函数处理后,生成的中间键值对最终被写到本地磁盘上,master获取到这些键值对在本地磁盘上的存储位置并传送给负责执行reduce任务的worker节点。
reduce worker程序使用RPC从负责执行map任务的worker节点所在主机的磁盘上读取这些缓存数据,由于存在不同的key值映射到相同的reduce任务上,因此会进行排序,如果中间数据太大无法在内存中完成排序,那么就要进行外排序[5],最终使得具有相同key值的数据聚合在一起。
排序后的中间数据经过reduce worker遍历后,reduce worker程序将每一个惟一的中间key值和它相关的中间value值的集合传递给用户自定义的reduce函数[5]。
当所有的map、reduce任务完成后,用户程序对map、reduce的调用才返回。
2 Shuffle过程
Shuffle过程可以理解为是从map映射输出到reduce消化输入的整个过程,整个过程被称为MapReduce的“心脏”,关乎整个框架性能[1],应用开发人员对MapReduce框架的改进也主要集中在Shuffle阶段。
MapReduce框架中Shuffle流程如图1所示[1]。
2.1 Map端
⑴ 作业提交以后,Map端从输入块中读取record,依次调用map函数进行处理,并映射输出(key,value)键值对。
⑵ 生成的键值对串行化的输出到环形内存缓冲区[7],这期间如果应用程序员没有定制partitioner,那么系统会调用默认的HashPartitioner把键值对划分到对应的partition分区。
⑶ 当缓冲区的内容达到设定的阈值时,一个后台spill线程便开始把这些内容溢写到磁盘[1],spill线程在把缓冲区数据写到磁盘前,会对它进行一个二次快速排序,首先根据所属的partition排序,然后在每个partition内再按key排序[8]。
⑷ 当缓冲区的数据输出到磁盘后,可能会出现多个spill文件,这时候就进入Map端的合并(merge)阶段,会对这些文件做一个归并排序,最后输出一个index文件和一个数据文件,index文件记录了不同的key在数据文件中偏移量[7]。
2.2 Reduce端
⑴ Copy Phase:在Map端,只要有一个任务完成,Reduce任务就开始通过http方式复制其输出,而不是等所有Map端任务都结束之后再进行[1]。
⑵ Sort Phase:Map端输出在被取回的同时,被合并成多个文件,并按键排序,这个阶段更恰当地说,应该只进行了合并,因为排序(Sort)是在Map端进行的[1]。
⑶ Merge Phase:归并过程中的数据可能既有在内存中的也有在磁盘上的,如果都在内存中,则会直接被复制到Reduce端而省去向磁盘写这一步[8],然后通过归并得到最终的文件。
3 Combine优化
3.1 Combine优化机制简介
MapReduce框架的运作基于键值对,即数据的输入是键值对,生成的结果也是存放在集合里的键值对,其中键值对的值也是一个集合,一个MapReduce任务的执行过程以及数据输入输出的类型如下所示,这里我们定义list表示集合:
map(K1, V1) -> list(K2, V2)
combine(K2, list(V2)) -> list(K2, V2)
reduce(K2, list(V2)) -> list(K3, V3)
map函数操作所产生的键值对会作为combine函数的输入,经combine函数处理后再送到reduce函数进行处理,减少了写入磁盘的数据量,同时也减少了网络中键值对的传输量。在Map端,用户自定义实现的Combine优化机制类Combiner在执行Map端任务的节点本身运行,相当于对map函数的输出做了一次reduce[8]。
集群上的可用带宽往往是有限的,产生的中间临时数据量很大时就会出现性能瓶颈,因此尽量避免Map端任务和Reduce端任务之间大量的数据传输是很重要的。使用Combine机制的意义就在于使Map端输出更紧凑,使得写到本地磁盘和传给Reduce端的数据更少[1]。
选用Combine机制下的Combiner虽然减少了IO,但是等于多做了一次reduce,所以应该查看作业日志来判断combine函数的输出记录数是否明显少于输入记录的数量,以确定这种减少和花费额外的时间来运行Combiner相比是否值得[8]。
在具体的分布式应用中,应用开发人员需要自己定制map函数和reduce函数,在本文实验中用到的具体算法如下:
Algorithm 1是分割映射键值对的map算法。map函数的输入分别为文本数据中行偏移量key、每行文本内容text和上下文对象context,算法第1行将输入的每行文档内容数据text分割成单个的单词k,放入集合K中,第2-3行对K中的每个k,映射输出键值对(k,1),这里每个键k对应的值是1。
Algorithm 2是对中间键值对合并的combine算法。combine函数的输入为经map函数分割后的每个单词k、单词k及其统计次数存放的集合K1和上下文对象context,其中键值对的输入键midsum为经过combine算法后分发出的一类键名,用以标识键值对已经经过combine算法处理。算法第1行首先判断输入键是否为midsum,如果不是,则第2-4行对K1集合中的统计次数进行累加,输出键值对midsum作为键,累加结果sum作为值;第5-7行因为输入键是midsum,则直接映射输出,避免轮询造成更大的开销。
Algorithm 3是规约reduce算法。reduce函数的输入可能是combine函数的输出(midsum,sum)(存放于集合K2中),也可能是没有经过combine函数合并的map函数的输出(k,1) (仍然存放于集合K1中),还有上下文对象context,算法第1-2行对K1/K2集合中每个k进行累加求和,第3行输出最终的键值对(endsum,sum),sum即为最终的求和结果。
算法1至3如图2所示。
[Algorithm 1:分割键值对map算法
输入:key, text, context
输出:中间键值对(k,v)
步骤:
1 list(K) ← split text
2 foreach k ∈ K
3 emit intermediate(k,1);][Algorithm 3:规约reduce算法
输入:key, K1/K2, context
输出:求和结果
步骤:
1 foreach k ∈ K1/K2
2 compute the sum of v
3 emit (endsum,sum);\&][Algorithm 2:合并中间键值对combine算法
输入:key, K1, context
输出:中间键值对(k,v)
步骤:
1 if k does not start with midsum
2 foreach k ∈ K1
3 compute the sum of v
4 emit intermediate(midsum,sum);
5 else
6 foreach k ∈ K1
7 emit intermediate(k,v);
3.2 Combine优化机制执行时机
⑴ Map端spill的时候
在Map端内存缓冲区进行溢写的时候,数据会被划分成相应分区,后台线程在每个partition内按键进行内排序。这时如果指定了Combiner,并且溢写次数最少为3(min.num.spills.for.combine属性的取值)时,Combiner就会在排序后输出文件写到磁盘之前运行[1]。
⑵ Map端merge的时候
在Map端写磁盘完毕前,这些中间的输出文件会合并成一个已分区且已排序的输出文件,按partition循环处理所有文件,合并会分多次,这个过程也会伴随着Combiner的运行[7]。
⑶ Reduce端merge的时候
从Map端复制过来数据后,Reduce端在进行merge合并数据时也会调用Combiner来压缩数据。Combiner通常被看作是一个Map端的本地reduce函数的实现类Reducer,这个实验[9]也验证了这一理论的不足,但是在很多情况下Combiner在Reduce端的作用是有限的。
3.3 Combine优化机制运行条件
⑴ 满足交换和结合律[10]
结合律:
(1)+(2+3)+(4+5+6)==(1+2)+(3+4)+(5)+(6)== ...
交换律:
1+2+3+4+5+6==2+4+6+1+2+3== ...
应用程序在满足如上的交换律和结合律的情况下,combine函数的执行才是正确的,因为求平均值问题是不满足结合律和交换律的,所以这类问题不能运用Combine优化机制来求解。
例如:mean(10,20,30,40,50)=30
但mean(mean(10,20),mean(30,40,50))=22.5
这时在求平均气温等类似问题的应用程序中使用Combine优化机制就会出错。
另外需注意,在撰写含有Combiner的应用时,对于所有map函数的输出,并非一定都经过Combiner,有些会直接进入Reducer。
如果我们在程序中定制了一个Combiner,MapReduce框架使用它的次数可能是0次也可能是多次,为了保证Combine机制的正确运用,Combiner在数据的转换上必须与Reducer等价,如果我们去掉Combiner,Reducer的输出也应保持不变,而且,当Combiner被应用于中间数据的任意子集时,仍需保持等价的转换特性[9]。
⑵ Combine优化机制中存在的轮询问题
在开发过程中,使用Combine优化机制会存在轮询问题,即一个combine函数的输出结果可能会成为其自身的输入[11],经过combine函数处理的数据会再次进入combine函数,但轮询问题是不可避免的,所以要保证combine函数的输入类型和输出类型必须一致,若不一致,要增加判断的逻辑。
4 实验结果分析
4.1 实验环境
本文通过部署一个Hadoop伪分布环境,通过实验对比和分析来验证文中关于利用Combine优化机制理论的正确性。在实验搭建的集群中,NameNode和DataNode都在本机中。
硬件环境:Intel? Core? i5 CPU M 480 @ 2.67GHz×4,3.7GB内存,500GB硬盘。
软件环境:Fedora16,Hadoop 0.21.0,Jdk-6u27-linux-i586。
4.2 实验结果及分析
本文采用数字求和程序作为实验测试程序,采用文本文档作为实验数据集,文本中数据随机生成,正负相间,以防数据全正或全负致使计算结果溢出。
⑴ 实验一
采用418.9MB文本文档数据集进行多次实验,分析实验日志,取其中三组实验结果列表,利用Combine机制优化前后,框架中各部分的输入输出和耗时对比如表1所示。
作业结束后,查看日志,利用Combine机制优化前后作业都分别共执行了12个map任务和1个reduce任务,表1中详细记录了作业执行完成后map函数、combine函数和reduce函数的输入输出数量和耗时,通过表1的数据对比可以看出:一方面,使用Combine优化机制优化后,作业执行时间明显减少;另一方面,从实验数据中可以看到combine函数输入的数量要明显大于map函数输出的数量,而且combine函数输出的数量要明显大于reduce函数输入的数量,所以可以验证在使用Combine优化过程中存在明显的轮询问题,但由于实验中输入输出数据类型相同,所以轮询并不影响实验的最终输出结果,Combiner的输出结果被写到中间文件,并被发送到reduce任务中,经reduce函数处理后直接被写到最终的输出文件中,保存在HDFS文件系统上。
⑵ 实验二
分别采用41.9MB、83.8MB、167.6MB、335.1MB、418.9MB文本文档作为测试数据集。
图3为利用Combine优化机制优化前后作业执行总耗时对比,在图3中可以看到,随着数据集增大,利用Combine机制优化过的作业在总耗时方面时间减少明显,性能平均提高43%,可以充分验证使用Combine优化机制会大量减少Map端最后写到磁盘的数据量,同时也减少了网络中传输的数据量,大幅提高系统性能。
图4为利用Combine优化机制优化前后Map端计算耗时对比,可以看到,利用Combine优化机制优化后,计算机压力虽然大部分在Map端,但是Map端耗时随着数据集增大而减少也很明显。
图5为利用Combine优化机制优化前后Reduce端耗时对比,因为数据求和的实验在利用Combine优化机制优化后,计算压力大部分都在Map端,所以可以看到当数据集增大时,Reduce端执行时间基本都维持在4秒左右。
5 结束语
本文通过对Shuffle流程的详细分析,指出Combine机制在Shuffle流程中的具体执行位置,并详细分析了Combine优化机制执行所需条件和执行过程中存在的问题。通过实验分析,可以看到Combine机制对性能的提升确实很大,但在实际开发应用中应该结合文中所述和实际开发需要,评测是否需要利用Combine优化机制,只有在网络带宽资源有限,对系统的瓶颈比较大时才应该考虑使用Combine机制,以减少数据在网络中的传输量,降低系统对网络带宽的需求。
参考文献:
[1] Tom White.Hadoop权威指南[M].清华大学出版社,2011.
[2] 周一可.云计算下MapReduce 编程模型可用性研究与优化[D].上海交通大学,2011.
[3] Jeffrey Dean, Sanjay Ghemawat. MapReduce: Simplified Data Processing on Large Clusters[C],2004.
[4] Wikipedia. MapReduce介绍.2012年09月22日引自http://zh.wikipedia.org/wiki/MapReduce
[5] 王凯.MapReduce集群多用户作业调度方法的研究与实现[D].国防科学技术大学,2010.11.
[6] 徐强,王振江.云计算应用开发实践[M].机械工业出版社,2012.
[7] gaobotian. Hadoop源代码分析(完整版). 2012年09月22日引自http://wenku.baidu.com/view/ffc10130eefdc8d376ee32ec.html
[8] 皮冰锋等.Hadoop开发者第一期.2012年9月22日引自http://www.hadoopor.com/
[9] Lam, C著,韩冀中译.Hadoop实战[M].人民邮电出版社,2011.
[10] StackOverFlow. "Combiner" Class in a mapreduce job. 2012年9月23日引自http://stackoverflow.com/questions/10220371/Combiner-class-in-a-mapreduce-job
[11] 何忠育等.Hadoop开发者第四期.2012年9月23日引自 http://www.hadoopor.com/