基于MapReduce的等值连接中数据倾斜问题研究
2017-03-17褚龙现
褚龙现
摘要:针对MapReduce计算框架下实现数据表等值连接时不能很好地处理数据倾斜的问题,详细分析了数据倾斜带来的任务负载不均匀问题和解决思路,结合两表之间传统连接算法和广播连接算法思想,提出将倾斜数据和非倾斜数据区别对待的分区连接算法。实验结果表明,提出的算法很好地解决了数据倾斜问题下任务负载均衡问题,有效提高了两表之间等值连接查询效率。
关键词:数据倾斜;连接;MapReduce;分区
中图分类号:TP311 文献标识码:A 文章编号:1009-3044(2016)32-0226-03
Research on Data Skew in Equal-join based on MapReduce
CHU Long-xian
(Computer School, Pingdingshan University, Pingdingshan 467000, China)
Abstract: Aiming at the problem that data skew cant be handled well when data table is joined by MapReduce, this paper analyzes the problem of task load unevenness and solution in detail. Combining the traditional join algorithm between two tables and broadcast join algorithm, we propose a partitioning algorithm that treats skewed and non-skewed data differently. The experimental results show that the proposed algorithm solves the problem of task load balancing under data skewing and improves the query efficiency of equal-join between two tables.
Key words: Data skew; join;MapReduce;partition
1 引言
计算机网络技术的不断发展和社会信息化建设的不断加强促使数据规模的快速增长,PB级大小的数据管理在很多行业成为常态。随着大数据的出现,数据处理必然需要分布式计算。MapReduce是Hadoop平台的并行计算框架,可以实现对存储在HDFS(Hadoop Distributed File System)中的大数据进行分布式处理,提高大数据处理效率[1,2]。在网络应用中,搜索和数据库查询领域离不开连接操作[3],所以研究MapReduce下数据连接算法的优化有着重要意义。
MapReduce提供了数据连接并行处理的基本算法,但是在同一个数据集中经常出现某些数值大量出现,导致数据分布不平均问题,利用现有算法将导致节点负载不均衡[4,5]。本文以两个数据表连接为目标,研究数据倾斜情况下大数据连接操作的算法优化问题,提出分区连接算法,提高连接执行效率。
2 相关工作
2.1 MapReduce
MapReduce是基于Hadoop平台的并行计算框架,使用map和reduce函数实现数据的分布式处理,map函数对分片数据执行读取、分区、排序和合并后提交到reduce执行[6]。在执行任务过程中,由master节点负责系统控制和任务分配等工作,slave节点执行具体任务。MapReduce工作流程如图1所示。
图1 MapReduce工作流程
2.2 连接查询
根据连接运算执行的时机,Hadoop下两表连接主要有分为map端连接和reduce端连接两种[7]。
map端连接适用于参与连接的其中一个表大小可以缓存到内存中,广播连接是常见的一种算法实现,主要通过将小表广播到所有map节点,然后与每个节点中存储的另一个表的数据块进行连接,将结果写入HDFS中。
reduce端连接可以是标准Hash连接也可以是先通过半连接预处理数据后连接。Hash连接主要在map函数将标记有数据来源的元组按照连接属性进行Hash划分,完成shuffle后,相同连接属性的两个表数据会划分到同一个reduce中,在reduce端完成连接操作。
半连接应用于大表和小表连接的场景,通过小表中的连接属性对大表参与连接的数据进行预处理过滤,以此减少参与连接的大表数据量[8]。在MapReduce中应用半连接算法需要三轮map和reduce过程,分别完成小表连接属性获取、大表数据过滤和最后reduce的连接。
2.3 数据倾斜
数据倾斜主要描述的是数据表中某个特定值出现的频率远远大于其他值,在分布式存储情况下,参与连接运算的数据表数据倾斜将会对查询执行效率产生巨大影响。因为数据源数据倾斜导致在MapReduce计算框架中,默认的分区策略使reduce各任务接受的数据量可能不均衡,从而出现負载重的reduce一直处于工作状态,整个任务完成时间大大增加[9]。
3 负载均衡连接算法
3.1负载均衡处理方案
reduce任务负载不均衡主要由于MapReduce简单地对连接属性(key)进行Hash导致,为此可以优化Hash函数,将key值按照区间进行划分,相同区间的数据分区到同一个reduce中,最终使得reduce任务负载趋于均衡。实际应用中主要有两种区间分区,一种是简单区间分区,一种是虚节点分区[10]。
3.1.1区间分区
(1)简单区间分区
设定reduce的数量为n,区间分割点集为{r1,r2,…,rn-1},所有数据按照key在分割点集中的位置分为n个部分,则自定义Hash函数可以将数据划分为n的区间段,每个reduce处理一个区间段中的数据,以此实现reduce任务的均衡。
(2)虚拟节点分区
设定reduce的数量为n,区间分割点集为{r1,r2,…,rk*n-1}(k=1,2,…,m),所有数据按照key在分割点集中的位置分为k*n个部分,从k*n个区间段中依次取出n个执行Hash函数重分区到reduce中,直至全部取完,每个reduce处理一个区间段中的数据,以此实现reduce任务的均衡。
3.1.2區间分区实现
区间分区的实现主要是分割点集的确定,对于大数据集合,首先进行随机采样得到小样本,再进行排序并选取区间分割点集,最终在一次MapReduce任务中完成分割。分割点集确定过程如下:
第一步:对连接key进行采样,利用map函数对key按照比例进行过滤,将过滤后的样本key发送到同一个reduce中;
第二步:在reduce中得到key排序后的结果集合K,长度为L,则区间分割点集合为
R={ri|ri=kj, 1≤i≤n-1, j=1+i*(L/n),kj∈K }
其中,n为连接操作设定的reduce的数量。
3.2 分区连接算法
当待分割数据中某个值数据量特别大时会导致该数值横跨多个数据区间,最终大量相同数值的数据分区到同一个reduce,影响性能。为此,针对数据是否倾斜提出不同解决方案。
设定两个关系分别为R(M,N)和S(N,P),其中N是连接key。定义两个集合Lr和Ls,Lr包含R.N中高频数据,Ls包含S.N中高频数据。利用新的分区算法完成两个表的连接操作算法如下:
步骤1:R(M,N)和S(N,P)随机分区到n个reduce中,每个reduce中数据为Ri和Si,其中1≤i≤n;
步骤2:在第i个reduce中将Ri分解为三个集合,分别为:
Ri- loc ={r|r∈Ri, Ri.N∈Lr},Ri-bro={r|r∈Ri, Ri.N∈Ls},Ri-hash={r|r∈Ri- (Ri-local ∪Ri-bro) }
步骤3:将每个reduce中的Ri-bro集合发送到所有reduce中,将Ri-hash集合Hash到指定的reduce中,最终每个reduce上得到三个集合:
Ri-local= Ri-loc,Ri-broadcast =∪1≤j≤nRj-bro,Ri-other={r| h(r.N)=i,r∈∪1≤j≤nRj- hash }
步骤4:在每个reduce对S执行步骤2和步骤3
步骤5:在每个reduce节点执行
Ri-local∞Si-broadcast∪Si-local∞Ri-broadcast∪Ri-other∞Si-other
步骤6:将结果写入HDFS中。
4 实验
实验在云平台上虚拟4个节点组成Hadoop集群,主节点一个,从节点三个。每个节点为2.6GHZ的双核CPU,8GB内存,64位的CentOS 6.6操作系统,Hadoop版本为2.6.0。
实验数据集使用TPC-H测试集工具生成,测试customer和orders两个数据表的并行连接操作,设定customer数据量为2千万行,orders数据倾斜率设定为0.3和0.6两种,连接率为100%,数据量的选取如表1所示。
reduce端虚拟节点分区连接算法为VQ,本文提出的分区连接算法为PQ,则在两种倾斜率下执行连接操作时间如图2所示。
图2中显示在数据倾斜情况下,随着数据量增加PQ算法执行时间比VQ算法执行时间减少幅度也会增加;随着数据倾斜率的增大,两种算法执行时间会增长,但是PQ算法增加幅度较小。
5 结束语
本文研究的重点是优化分布式数据连接算法,致力于解决数据倾斜问题对连接算法的影响。针对传统区间分区算法无法处理倾斜数据问题,提出将倾斜数据和非倾斜数据区分对待,实验结果表明,提出的分区连接算法在大数据集连接操作上很好地解决了数据倾斜带来的负载不均衡问题,提高了数据查询效率。
参考文献:
[1]Qin X P, Wang H J, Xiao-Yong D U, et al. Big Data Analysis—Competition and Symbiosis of RDBMS and MapReduce[J]. Journal of Software, 2012, 23(1):32-45.
[2]赵彦荣,王伟平,孟丹,等.基于Hadoop的高效连接查询处理算法CHMJ[J].软件学报,2012, 23(8):2032-2041.
[3]高宇飞,曹仰杰,陶永才,等.MapReduce计算模型下基于虚拟分区的数据倾斜处理方法[J]. 小型微型计算机系统, 2015, 36(8):1706-1710.
[4]Gufler B,Augsten N,Reiser A,et al.Handing data skew in mapReduce[C].Proceedings of the 1 st International Conference on Cloud Computing and Services Science,2010,146:574-583.
[5]Kwon Y C,Ren K,Balazinska M,et al.Managing skew in Hadoop[J].IEEE Data Eng,Bull,2013,36(1):24-33.
[6]YANG G. The application of MapReduce in the cloud computing[C]// Proceedings of the 2011 2nd International Symposium on Intelligence Information Processing and Trusted Computing. Piscataway: IEEE, 2011: 154-156.
[7]Blanas S,Patel J M,Ercegovac V,et al.A Comparison of Join Algorithms for Log Processing in MapReduce [C].SIGMOD 10,Indianaplis,Indiana,USA,2010:975-986
[8]金健,陈群,赵保学.数据倾斜情况下基于MapReduce模型的连接算法研究[J]. 计算机与现代化, 2013(5):22-27.
[9] Kwon Y, Balazinska M, Howe B, et al. A Study of Skew in MapReduce Applications[J]. Open Cirrus Summit, 2011.
[10]Atta F, Viglas S D, Niazi S. SAND Join — A skew handling join algorithm for Google's MapReduce framework[M]. IEEE, 2011.