APP下载

基于MapReduce框架的重分区连接的优化研究

2016-04-22肖颖

计算机时代 2016年4期

肖颖

摘 要: 重分区连接查询是基于传统MapReduce框架的最常用的连接查询算法之一。在讨论基于传统MapReduce框架的标准重分区连接算法及减小数据缓存的改进算法的基础上,提出了在数据文件分块阶段进行预筛选以精简MapReduce框架中处理的数据量的方法。该方法能有效减少框架内部各个阶段处理的数据总量,进一步压缩缓存的使用空间并降低不同阶段之间数据传输的网络开销。

关键词: MapReduce; 连接查询; 重分区连接; 预筛选

中图分类号:TP312 文献标志码:A 文章编号:1006-8228(2016)04-09-03

0 引言

近年来,随着移动互联网、电子商务及社交媒体快速发展,网络的数据信息量呈指数型增长。为了能更快更好地分析处理这些庞大的数据信息,很多企业选择将数据迁移到价格相对低廉且容错性能较强的云环境[1]中进行处理。MapReduce框架[2]是云计算最为核心的技术之一。作为海量数据的并行处理平台,MapReduce编程模型[3]简单,隐藏了并发、容错、分布式计算和负载平衡等复杂繁琐的细节,并具有较高的可扩展性和容错性,现已广泛应用于海量数据的分析和处理领域。

但在MapReduce框架中,连接查询运算仍然过程复杂、工序繁琐,同时面临数据倾斜、分布式环境数据传输等问题,效率较低。如果能提高MapReduce的连接查询效率,则可进一步提高数据分析效率和用户体验满意度。

本文就现有的基于传统MapReduce框架的重分区连接查询方法进行深入探讨和研究,并进一步讨论可能的优化策略。

1 传统MapReduce框架实现机制

传统MapReduce框架将所有面向海量数据的计算划分成两个阶段:Map阶段和Reduce阶段,每个阶段可由用户自行定义其处理函数,且都以(K,V)二元组的形式进行输入和输出。但由于大部分Mapper与Reducer并非执行在相同节点上,因此MapReduce框架需要一个介于Map函数和Reduce函数之间的Shuffle过程来实现它们之间的数据整理和传输。以下是传统MapReduce框架的具体工作步骤[4]。

⑴ 准备工作

MapReduce框架将大量输入数据分割成M个大小固定的块。

⑵ Map阶段

Mapper读取分配给它的块信息,并从中分离出各条记录。

Mapper从每条记录中抽象出二元组(K1,V1),并传递给用户自定义的Map函数执行生成二元组(K1',V1')。由此块信息经由Map阶段处理得到一个输出序列{(K1',V1'),(K2',V2'),…,(Kn',Vn')},同时这些数据将被存入缓存。

⑶ MapReduce框架的Shuffle过程

(a) 为了使Reduce函数获得有序的输入信息,Shuffle过程负责将Map阶段的输出序列进行排序分组归并,使得具有相同键值K'的数据V'集中在一起,形成(K',list(V')),且list(V')中的值按V'进行排序。因为数据量巨大,所以该阶段可能使用外部排序。

(b) 将处理好的(K',list(V'))发送给Reduce函数。

⑷ Reduce阶段

执行Reduce函数,生成最终的执行结果(K'',V''),并作为输出结果写入文件。

2 重分区连接查询算法及其优化探讨

在数据爆炸的今天,有些大型的互联网公司每天需要利用高达TB甚至PB级别的日志信息来分析数据,以获取有利于其发展的统计信息。但其中大部分操作都是对巨型数据表(例如用户表User和日志表Log)进行连接查询操作:

SELECT User.Col1, User.Col2, …, User.Coln,

Log.Col1, Log.Col2, …, Log.Colm

FROM User, Log

WHERE User.UserID=Log.UserID AND CUser AND CLog

其中CUser表示仅和表User相关的筛选条件,CLog表示仅和表Log相关的筛选条件,User.Col1,User.Col2,…,User.Coln表示表User中的n个列(表User的列数≥n),Log.Col1,Log.Col2,…,Log.Colm表示表Log中的m个列(表Log的列数≥m)。假设若表User 共有mU行,每行的数据量为lenU字节;表Log共有mL行,每行的数据量为lenL字节,则执行该连接查询将面临为(mU×lenU)×(mL×lenL)级别的巨大数据量。

在此我们讨论基于传统MapReduce框架的最常用的连接查询算法之一——重分区连接查询算法[5]。该算法类似于并发数据库管理系统中的分块归并排序连接,同时继承了传统MapReduce框架的容错性能和负载均衡性。

2.1 标准重分区连接算法

标准重分区连接操作在一个单独的MapReduce工作中完成:Map阶段进行数据的预处理,Reduce阶段进行连接查询操作。其具体执行步骤如下。

⑴ MapReduce框架将巨型表User和Log分割成M个大小固定的块。

⑵ 在Map阶段,每个Mapper读取一个块,继而提取出该块中每个记录的连接键值join-key;同时生成含有表标记tag的记录tagrecord,用以识别该记录来自于哪一张表。Mapper输出该块的(join-key, tagrecord)序列并存入缓存。

⑶ MapReduce框架的Shuffle过程对(join-key, tagrecord)序列进行分组、排序和归并。相同join-key的记录被分到一组,并输出给Reducer。

⑷ 在Reduce阶段,每个Reducer首先按tagrecord信息(该记录来自于表User或Log)将输入的记录分为两组,并分别存入各自的缓存BU和BL中,然后将两组信息进行笛卡尔积运算,进而实现查询。

标准重分区连接中存在的问题是:User或Log表中的所有记录都必须写入缓存。然而若|User|<<|Log|时,来自于Log中的记录可能导致内存溢出。

2.2 改进的重分区连接算法

为解决标准重分区连接中可能存在的缓存溢出问题,标准重分区算法可做如下改进。

⑴ 在map函数中,将输出的二元组序列( join-key, tagrecord )改为(join-key-tag, tagrecord),加入表标识tag保证来自于User的记录一定排列在Log的记录之前。

⑵ 在MapReduce框架的shuffle阶段自定义分区函数,使得后续所有计算只根据join-key-tag中的join-key部分来进行。

做出改进后,表User中的记录一定会在Log记录之前,所以只有User中的记录需要存入缓存BU,而Log中的记录则以数据流的形式快速读出并与相关的User中的记录进行连接并输出结果。

改进的重分区连接算法虽然有效地改进了标准算法中的缓存问题,降低了内存溢出的可能,但在mapreduce的shuffle阶段仍需对表User和Log进行排序并通过网络传输数据信息,该操作是连接查询的主要执行开销,会大幅降低其执行效率。

2.3 改进重分区连接算法的预处理

在重分区连接中,如果表User和Log中的数据信息在进行连接操作之前已经按连接键值分区完成,则shuffle阶段的开销就能实现有效降低。该预处理可以通过以下方式实现:表Log在日志记录生成时根据join-key进行分区,而User表则在将其加载到分布式文件系统中时根据join-key进行预分区。从而在查询时,User和Log中相互匹配的分区就能直接进行连接查询。

对比平行关系数据库管理系统,由于分布式文件系统独立决定每一个数据块的存放位置,所以上述方法不能保证表User和Log的相互匹配的分区存放在同一个物理节点中。因此,查询时必须使用直接连接策略。即每个map任务在Log的一个片段Li上进行。在初始化阶段,Mapper从分布式文件系统中取出表User的一个片段Ui,若其尚未进入本地存储系统则将为其建立内存哈希表HUi;然后map函数扫描Li中的每个记录并尝试连接哈希表HUi。由于分区的数量是可选的,因此该方法确保每一个Ui都能装入内存。

3 精简连接查询数据量的预筛选

上述三种重分区连接算法都是从如何减少运算过程中产生的缓存及传输的数据量的角度来提高连接查询效率,但却忽略了连接查询本身的计算数据量的精简,即无论使用上述哪一种算法进行重分区连接查询,其对应的关系代数都没发生实质性的优化,而始终为:

换言之,进入MapReduce框架的数据量即最初分块处理和Mapper都仍然面临着(mU×lenU)+(mL×lenL)字节的数据量,而Reduce阶段的笛卡尔积运算仍将产生具有 (mU×lenU)×(mL×lenL)字节的庞大的中间结果,并需对其进行最终结果筛选。

但根据现实数据的处理情况可知,在MapReduce框架上实现的多个大型表之间的连接运算在大多数情况下仍是等值连接,并且最终从查询结果中获取的也只是其中某几个列的信息。因此,基于MapReduce框架的重分区连接算法还可以通过对大量数据信息进行筛预选处理的方法来降低进行连接查询的数据量,从而进一步减少缓存的使用空间并有效降低shuffle阶段的数据传输造成的网络开销。

根据关系代数优化的典型启发式规则,上述关系代数表达式可优化为:

若查询结果中不包含重复列的信息,则该关系代数能进一步优化为自然连接运算:

其中,为自然连接运算符。

上述优化表达式说明表User和Log在进入MapReduce框架进行连接查询之前,可以先对大量数据进行数据的预筛选,使与结果无关的数据不参与庞大的连接运算。根据传统MapReduce框架的工作原理,该筛选操作可以加载在该框架最初的文件分块阶段中。具体操作步骤如下。

⑴ 将表User分块的同时将其进行一遍扫描:筛选出满足查询条件CUser的行的同时,投影出该行中最终查询结果所需的分量Col1,Col2,…,Coln和连接列分量UserID,构成一个中间结果行,将其存入到分块中。当一个块放满后,将中间结果写入下一个块。

⑵ 同理地,将表Log分块的同时将其进行一遍扫描:筛选出满足查询条件CLog的行的同时,投影出该行数据中最终查询结果所需的分量Col1,Col2,…,Colm和连接列分量UserID构成一个中间结果行,将其存入到分块中。

⑶ 而后再将这些块分配给Mapper进行后续操作。

若表User中满足查询条件CUser的元组共有mU'行,每行所需的分量Col1,Col2,…,Coln和UserID的数据量为lenU'字节;表Log中满足查询条件CLog的元组共有mL'行,每行所需的分量Col1,Col2,…,Colm和UserID的数据量为lenL'字节。则由此可知,进入MapReduce框架的数据量减少为(mU'×lenU')+(mL'×lenL')字节,而最终的连接查询面对的中间结果也减少为(mU'×lenU')×(mL'×lenL')字节。

若有mU'<

4 未来工作展望

本文提出的预筛选的方法在一定程度上能提高整个MapReduce框架的连接查询的执行效率,但其算法复杂度并没有得到质的提升。即若表User或Log中所有的行都分别满足查询条件CUser和CLog,且要求查询两张表连接之后所有列,则预筛选方法对数据信息量的降低将起不到明显作用。后续的研究是对该问题进行深入探讨,以找出降低算法复杂度的方式,从本质上提高整个查询运算的效率。

参考文献(References):

[1] VMware vCAT团队.VMware vCAT权威指南:成功构建云

环境的核心技术和方法[M].机械工业出版社,2014.

[2] 董西成.Hadoop技术内幕:深入解析MapReduce架构设计

与实现原理[M].机械工业出版社,2013.

[3] Donald Miner, Adam Shook. MapReduce设计模式[M].人民

邮电出版社,2014.

[4] Dean J, Ghemawat S. MapReduce: Simplified Data

Processing on Large Clusters[C]. Proc. of OSDI'04. San Francisco: [S. n.],2004.

[5] Blanas S, Rao J, TianY, et a1. A comparison of

joinalgorithms for log processing in MapReduce[C]. Proceedings of the 2010 ACM SIGM0D International Conference on Management of Data,2010.