CC-MRSJ:Hadoop平台下缓存敏感的星型联接算法*
2013-08-10周国亮朱永利王桂兰
周国亮,朱永利,王桂兰
(1.华北电力大学控制与计算机工程学院 保定 071003;2.国网冀北电力有限公司技能培训中心 保定 071051)
1 引言
随着大数据在各个行业的迅速发展,传统的并行数据仓库系统在处理大数据时由于成本高、可扩展性差及容错能力低等原因而力不从心。因此,基于Map Reduce[1,2]技术的数据仓库系统获得了广泛关注和应用,如Facebook的Hive[3]、Yahoo的 Pig[4]等。然而,困扰 Map Reduce数据仓库系统进一步深入应用的主要问题是性能低下。因此,围绕如何提高Map Reduce系统的性能是目前各方普遍关注的热点难点。在Map Reduce环境下,数据一般以分布式文件系统的形式存储在集群的磁盘上,其主要代价是磁盘和网络I/O,而不同的数据存储格式对系统的I/O有较大的影响,从而影响系统性能。在传统数据仓库系统中普遍采用列存储提高性能[5~7]。通过列存储可以有效地较少I/O,并利用数据压缩技术,进一步提高性能。因此,随着Map Reduce技术的流行,列存储在Map Reduce系统中获得广泛关注,提出了 RCFile[8]、COF[9]和 CFile[10]等列存储文件格式。基于列存储的数据结构,Map Reduce性能获得了一定提高。
在数据仓库中,通常采用星型模式作为数据组织和存储模型。一般情况下,星型模型由一个很大的事实表和多个较小的维表组成。因此在数据仓库中,星型联接(star join)是一项核心和基本的操作,即事实表和多个维表之间的联接。Map Reduce中已有的联接算法[10~12]较少考虑星型联接的特殊性,从而造成事实表数据移动和庞大的中间结果,因此性能很难提高。考虑星型模型的结构,提高性能的关键是尽量减少事实表的磁盘I/O和数据移动,事实表进行本地处理。
另一方面,Map Reduce起初部署在配置较低的机器上,内存与磁盘的存取速度差距不大,较少考虑现代处理器的多层存储体系结构[13],造成CPU缓存利用率低。而目前的Map Reduce普遍部署在配置很高的机器上,内存与磁盘的读写速度差距越来越大,因此充分利用现代处理器的缓存特性[14],有利于进一步提高 Map Reduce算法的性能[15,16]。
本文提出了一种具有缓存敏感特性的CC-MRSJ(cache consc iousMap Reduces tarjoin)算法,首先对事实表进行垂直划分,每列单独存储;维表根据具有的维层次结构划分为不同的列簇;外键列和对应维表采用相关性存储,尽量部署在相同的节点上。通过上述存储结构,可以提高缓存利用率,减少数据在节点间的移动。算法通过两个阶段完成,首先外键列和对应维表进行联接,可以采用map-reduce联接或map联接;然后对中间结果根据位置索引进行reduce联接并输出,利用位置索引随机读取查询中涉及的测度列。通过对SSB[17]数据集的Q3.1和Q4.1测试表明,CC-MRSJ算法具有较高的执行效率。
2 相关工作
2.1 MapReduce及其数据存储格式
MapReduce是由Google公司近年来提出的一种大数据处理框架,MapReduce程序一般分为map和reduce两个阶段。map函数接受键值对(key-valuepair)输入,产生一组中间结果键值对,MapReduce框架会将map函数生成结果中键相同的值传递给一个reduce函数;reduce函数接受一个键以及相关的一组值,将这组值进行合并产生一组规模更小的值。
MapReduce的思想是将计算推向数据,尽量减少数据移动。目前,由Apache基金会组织开发的MapReduce开源实现Hadoop[18]在工业界和学术界获得了广泛应用。
MapReduce中的数据主要存储在磁盘,程序运行中的主要代价是磁盘和网络I/O。因而不同的数据存储格式对性能具有较大的影响,通过列存储可以有效减少I/O,进而提高性能,因此列存储技术在MapReduce中获得了广泛关注,多种列存储模型被提出。列存储中一般通过位置索引(隐式RowId)组织不同列的数据。目前,Hive系统提供RCFile(record columnarfile)[8]格式数据存储,RCFile 首先将数据水平划分为数据块,然后在块内垂直划分为列簇。而CFile[10]不考虑水平划分,而是将表垂直划分为多个列簇。CIF[9]采用每列单独存储,不考虑列簇,但不同列的数据采用相关性存储,从而减少列组合时的网络数据传输。然而这些存储模型并没有考虑星型模型的特殊结构。
2.2 联接算法
联接操作是数据库中的一项核心操作,研究人员对此进行了深入系统的研究。而随着MapReduce对多数据源分析需求的增加,研究人员提出了多种Map Reduce环境下的联接算法,目前Hive系统中主要支持的联接算法如下。
(1)map-reduce联接
在map阶段,两个表根据联接关键字进行划分和排序;在reduce阶段,对每一个分区进行联接操作。这种方式类似于关系数据库的散列联接(Hashjoin)。
(2)map 联接
假如参加联接操作的两个表中有一个足够小,小表被分发到各个节点,于是在map阶段大表的每个片段与小表进行联接。这种方式类似于数据库中的嵌套循环联接(nested loopjoin)。
(3)分块的 map 联接
将参加联接的表划分为对应的数据块,于是在map阶段一个表中的块与另一表中对应的块进行联接,不需要reduce操作。这种方式类似于Sort-Merge联接中的Merge操作。
星型联接是数据仓库中非常普遍的一种操作,通常情况下星型联接是一个事实表F和多个维表D1,D2,…,Dn之间的联接。事实表和维表具有如下特征:
·维表Di的主键PK和事实表F的外键列FKi之间存在对应关系;
·事实表一般由外键列和测度列组成:F(fk1,fk2,…,fkn,m1,m2,…,mk)。
为了在Map Reduce环境下高效处理星型联接,相关研究人员提出了并发联接(concurrent join)[10]和Scatter-Gather-Merge(SGM)[12]算法。这两个算法的核心思想都是把星型联接划分为多个小的联接,然后再合并中间结果。并发联接算法采用垂直划分,SGM采用水平划分。但两者未考虑事实表和维表相关性存储及缓存特性。
3 C C-MRSJ算法
3.1 数据存储格式
列存储一般将所有列划分为不同的列簇,经常一起访问的列组成一个簇,从而减少了列组合实体化(materialization)的代价,也避免了访问不相关的列。但是,预测哪些列经常一起访问是非常困难的,因此需要构建多种列组合模式的列簇,某些列会多次出现在不同的列簇中,从而造成存储空间浪费。考虑到星型模型的结构,事实表很大而且列数较多,而预测哪些列经常一起访问存在一定困难,所以事实表采取每列单独存储方式。而用户对维表的访问,一般是和某个维层次(dimens ionhi erarchy)相关的,因此维表采用列簇存储,根据维表具有的维层次划分为多个列簇,每个列簇包含主键和对应的维层次属性。比如SSB[16]中的customer维表可以划分为如下几个列簇:{[CUSTKEY、NAME]、[CUSTKEY、CITY]、[CUSTKEY、NATION]、[CUSTKEY、REGION]、[CUSTKEY、其他列]}。划分后的 customer表如图1所示。
通过这种方式有效地减少了联接操作中的磁盘I/O;同时无论是事实表还是维表只访问需要操作的列,提高了CPU缓存利用率,使算法具有缓存敏感特性。
事实表的外键列占用存储空间远远大于维表,比如SSB数据集在SF=10时,事实表外键列lo_custkey占用存储空间约为1GB(含位置索引),如果Hadoop的块大小为64MB,采用3份复制策略,则lo_custkey需要48块存储,可以分布到小规模集群中的大部分节点。而维表customer的大小约为28MB,Hadoop会采用一个数据块存储,最多只能分布在3个节点上。因此对事实表和维表采取不同的复制策略,要求维表的复制数至少为nd=size(fki)/64。另外,星型模型中事实表的每个外键列一般情况下只需要与对应的维表进行联接操作,事实表和维表采取相关性存储,也就是事实表外键列和对应维表的数据块存储在同一节点,从而减少联接操作过程中从其他节点拉取数据的情况,减少网络I/O。存储格式可以用图2表述,其中D2-hn表示维表列簇块,fki表示外键列的分块。
3.2 算法描述
基于上述存储结构,本文提出了CC-MRSJ算法。将星型联接划分为两个步骤完成:首先每个维表和对应的事实表外键列进行联接;然后对产生的中间结果进行联接并读取测度列的值,从而得到最终结果。执行过程如图3所示。
图1 维表的列簇存储模式
图2 星型模型存储模式
图3 星型联接执行计划
可以用式(1)表示:
在算法的第一个阶段中,维表和外键列的联接操作根据集群中的计算资源可以并行或串行。所谓并行,就是多个外键列和对应的维表同时进行联接;串行是一次一个维表和一个外键列联接。通过并行可以进一步提高算法的效率。
根据维表的存储和分布特点,联接方式可以是map-reduce联接,也可以是map联接。如果维表的全部数据可以在一个节点获得,则外键列的一部分与维表联接,使联接可以在map阶段完成,否则需要map-reduce联接才能完成。
这一阶段的输出结果是以位置索引为key的键值对,值为联接需要从维表中读取的某列的值。这一阶段是CC-MRSJ算法的主要代价。
第二个阶段对位置索引进行排序,然后计算每个位置索引的个数(count),如果个数与参加星型联接的维表个数相等,则将此位置索引对应的值组合输出。对于测度列,根据位置索引以随机访问的方式读取对应位置的值。
下面以SSB数据集中的Q3.1为例,说明算法的执行过程,Q3.1涉及多个查询条件,是一个事实表和3个维表的联接。只考虑联接操作,因此去掉了groupby操作和聚集计算,该查询语句具有如下形式:
SELECT c_nation,s_nation,d_year,lo_revenue
FROM customer
JOIN lineorder ON lo_custkey=c_custkey
JOIN supplier ON lo_suppkey=s_suppkey
JOIN ddate ON lo_orderdate=d_datekey
WHERE c_region= ’ASIA’
AND s_region= ’ASIA’
AND d_year>=1992 and d_year<=1997;
详细的执行过程如图4所示。
图4 星型联接的例子
首先,外键列和对应维表的列簇进行联接,生成结果的键值对是位置索引和维表中维层次的值;如图4所示,3个维表分别与对应的外键列联接。
然后,每个中间结果作为输入进行reduce计算,根据位置索引进行排序,如果同一位置索引的值达到3则输出;如图4中位置索引为5的纪录满足3个查询条件,是最终结果集中的一条记录。
最后,根据上一步结果中的位置索引,到测度列中以随机访问的方式读取对应位置的测度值,并与结果集合并,形成联接操作的最终结果。
CC-MRSJ算法的形式化描述如下,这里省略了算法的解释。
map(k,v)//mapsidejoin:维表存储在分布式缓存中DistributedCache
输入:
kisakey//position:位置索引,隐式或显式RowId
3.3 算法分析
影响CC-MRSJ算法的主要因素是算法第一阶段的磁盘I/O和网络数据传输。其中磁盘I/O主要包括原始数据的读取和中间结果的写入、读取;而网络数据传输主要是map的结果向reduce节点的传输。事实表外键列所需存储空间比维表要大很多,而且一般情况下维表和外键列联接后的结果是过滤后的数据,相对于原始数据需要更少的存储空间。因此,算法的主要代价在第一个阶段,而第一个阶段是由多个相似的事实表外键列和维表层次列簇之间的联接,而此操作具有相似的代价,所以算法的代价与联接中维表的个数,也就是外键列的个数成正比:
其中,p表示代价,k表示比例因数。
另外,如果集群的规模足够大,可以支持多个维表和事实表外键的并发联接操作,算法可以保持近似的执行效率,即算法具有横向扩展特征。
目前,计算机普遍配置有很高的内存容量,因此可以利用大内存优化算法的性能。如果维表的列簇可以存储在节点的内存中,将维表分发到每个节点的本地内存中,则外键列和维表之间的联接可以通过map联接方式完成,从而获得更高的效率。Hadoop的最新版本也提供了分布式缓存(Hadoopdistributedcache)功能,为应用程序充分利用大内存提供了便利。
4 实验结果与分析
4.1 实验环境
本实验采用SSB数据集,SSB数据集是由TPC-H数据集演变而来,其中包括一个事实表(LINEORDER)和4个维表(CUSTOMER、SUPPLIER、PART 和 DATE)。当设置 SF=10时,事实表大约为6GB,可以划分为约100个64MB的数据块,每个数据块配置2个备份,对于包含有限个节点的小集群,数据可以均匀地覆盖整个集群。数据以文本文件的形式存储在磁盘上。对于事实表的每列只存储列值,在map操作中,输入的键值对为位置索引和对应位置的列值。
目前Hive算法支持星型联接,CC-MRSJ算法与Hive进行比较,Hive的版本采用0.8.1,数据存储格式包括TextFile和 RCFile。
实验室搭建了包括6个节点的MapReduce测试环境,主节点配置的机器是DELLPowerEdgeT310,使用Intel XeonCPUX3430@2.40GHz处理器和2GBRAM,系统LinuxUbantu版本。数据节点是Lenovo计算机,配置Intel PentiumD处理器和2GBRAM。采用的Hadoop的版本为0.20.2。
SSB的Q4.1查询语句在去掉分组和聚集运算后的格式如下:
SELECTc_nation,p_name,s_nation,d_year,
lo_revenue,lo_supplycost
FROM lineorder
JOIN customer ON lo_custkey=c_custkey
JOIN supplier ON lo_suppkey=s_suppkey
JOIN part ON lo_partkey=p_partkey
JOIN dwdate ON lo_orderdate=d_datekey
AND s_region= ‘ASIA’
AND d_year>=1992 and d_year<=1997
AND(p_mfgr= ‘MFGR#1’or p_mfgr= ‘MFGR#2’);
此查询是一个事实表和4个维表的联接操作。
另外,Hive需要将SQL语句转化为MapReduce程序,目前Hive转化的后程序还存在较大的优化空间,于是一些研究人员设计了SQL转MapReduce的第三方程序,比如YSmart[19,20]等,以期提高 MapReduce程序的效率,但这些转化只是语句方面的优化,并未考虑数据存储格式,所以算法效率和Hive转化后的程序并没有较大提高,所以本文直接与Hive系统进行比较。
4.2 实验结果
首先对SSB数据集上去掉聚集运算的Q3.1和Q4.1查询进行比较测试,分别与Hive中的以TextFile和RCFile存储格式的联接算法进行比较,实验结果如图5所示。
图5 原始查询
对于星型联接,目前Hive处理的方式是事实表先与其中一个维表联接,然后中间结果和另一个维表进行联接,直到所有维表处理完毕,即((F∞D1)∞D2)…∞Dn,这样会产生很大的中间结果,因此性能很难提高。另外,在测试环境中Hive系统采用TextFile和RCFile存储格式对算法性能的影响有限,如图5(a)所示,所以在Q4.1的测试中省略了RCFile的执行时间。
对于某些联接操作,不需要读取测度列,因此进一步测试了星型联接中去掉测度列的情况,测试结果如图6所示。
图6 不包括测度列
由于CC-MRSJ算法对测度列单独处理和存储,联接中如果没有测度列,算法不需要访问测度列,从而需要更少的磁盘I/O,而Hive系统无论采用TextFile还是RCFile都需要访问测度列,从而造成额外的I/O。因此,CC-MRSJ获得了更高的加速比。
某些情况下,联接查询不包括WHERE查询条件,于是测试了CC-MRSJ将Q3.1和Q4.1中的WHERE条件省略后的效率,测试结果如图7所示。
在这种条件下,由于没有过滤条件,算法需要读取的数据量比包含WHERE条件的联接操作要多,所以算法执行的时间更长。但是CC-MRSJ通过数据划分及相关性存储和时延实例化,从而有效地减少了I/O,保证了算法执行的高效率。因此,CC-MRSJ算法同样优于Hive算法。
5 结束语
图7 不包括WHERE条件
本文初步探讨了在MapReduce环境下利用合理的数据划分和组织格式,提高星型联接算法执行效率,算法设计过程中也充分考虑了现代处理器的缓存特性。通过实验比较,算法与Hive中的联接算法相比,获得了近似两倍的性能提升。另外,现代处理器普遍具有多核和大内存,而目前的MapReduce系统对此资源利用不足,因此,如何进一步利用现代处理器特征,提高MapReduce算法的执行效率,使MapReduce算法在横向扩展的同时利用处理器的纵向扩展特征将是未来的研究方向之一。
1 Dean J,Ghemawat S.MapReduce:simplified data processing on large clusters.Communications of the ACM,2008(1)
2 Chang F,Dean J,Ghemawat S,et al.Bigtable:a distributed storage system for structured data.ACM Transactions on Computer Systems,2008(2)
3 Thusoo A,Sarma J S,Jain N,et al.Hive-a warehousing solution over a MapReduce framework.Proceedings of the VLDB Endowment,2009,2(2):1626~1629
4 Gates A,Natkovich O,Chopra S,et al.Srivastava,building a high level dataflow system on top of MapReduce:the pig experience.Proceedings of the VLDB Endowment,2009,2(2):1414~1425
5 Stonebraker M,Abadi D J,Batkin A,et al.C-store:a columnoriented dbms.Proceedings of the 31st International Conference on Very Large Data Bases,Trondheim,Norway,2005:553~564
6 Abadi D J,Madden S,Hachem N.Column-stores vs row-stores:how different are they really.Proceedings of the 2008 ACM SIGMOD International Conference on Management of Data,Vancouver,2008:967~980
7 Ailamaki A,DeWitt D J,Hill M D,et al.Weaving relations for cache performance.Proceedings of the 27th International Conference on Very Large Data Bases,Roma,2001:169~180
8 Lee R,Yin H,Zheng S,et al.RCFile:a fast and space-efficient data place-ment structure in MapReduce-based warehouse systems.ICDE 2011,Hannover,Germany:2011:1199~1208
9 Floratou A,Patel J M,Shekita E J,et al.Column-oriented storage techniques for MapReduce.Proceedings of the VLDB Endowment,2011(7)
10 Lin Y T,Agrawal D,Chen C,et al.Llama:leveraging columnar storage for scalable join processing in the MapReduce framework.Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data,Athens,Greece,2011
11 Blanas S,Patel J M,Ercegovac V,et al.A comparison of join algorithms for log processing in mapreduce.Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data Indiana,USA,2010:975~986
12 Han H,Jung H S,Eom H S,et al.Yeom:scatter-gather-merge:an efficient star-join query processing algorithm for data-parallel frameworks.Cluster Computing,2011,14(2):183~197
13 Rao J,Ross K A.Cache conscious indexing for decision-support in main memory.VLDB,Edinbargh,Scotland,1999:78~89
14 Brewer E A.Towards robust distributed systems.Proceedings of the Nineteenth Annual ACM Symposium on Principles of Distributed Computing,Portland,Oregon,2000
15 Zhang S B,Han J Z,Liu Z Y.Accelerating MapReduce with distributed memory cache.ICPADS 2009,Shenzhen,China,2009:472~478
16 Shinnar A,Cunningham D,Saraswat V,et al.M3R:increased performance for in-memory Hadoop jobs.Proceedings of the VLDB Endowment,2012(5)
17 O’Neil P,O’Neil E,Chen X.The star schema benchmark,http://www.cs.umb.edu/~poneil/star.SchemaB.PDF,Minneapdis,2007
18 Apache Hadoop.http://hadoop.apache.org/,2012
19 Lee R,Luo T,Huai Y,et al.YSmart:Yet another SQL-to-MapReduce translator.Proceedings of the 31st International Conference on Minneapolis,MN,USA,2011:25~36
20 Huai Y,Lee R,Zhang S,et al.A matrix model for analyzing,optimizing and deploying software for big data analytics in distributed systems.Proceedings of the 2nd ACM Symposium on Cloud Computing,Cascais,2011