针对字典序依赖的分布式数据修复
2023-10-09郭乃网谈子敬曹满亮
郭乃网 覃 晟 谈子敬 曹满亮
1(国网上海市电力公司电力科学研究院 上海 200437)
2(复旦大学 上海 200433)
0 引 言
随着数字化技术的广泛应用,为了保证在市场经济环境下系统能安全、可靠运行,各种管理信息系统(MIS)、地理信息系统(GIS)、电网运行的实时信息系统等的广泛应用,产生了海量的工商业运行数据。面对这种海量数据的增加,数据挖掘技术可以发现更深层次的规律。数据质量问题是数据挖掘过程中经常面临的挑战。目前的研究大多着眼于数据挖掘算法的探讨,而忽视了数据分析前对数据质量处理的研究。一些比较成熟的算法对其处理的数据集合都有一定的要求,比如数据完整性好、数据的冗余性少、属性之间的相关性小。然而,实际系统中的数据通常存在各种问题,很少能直接满足数据挖掘算法的要求。同时海量的数据中无意义和错误的成分也普遍存在,严重影响了数据挖掘算法的执行效率。因此,如何提高数据质量已经成为数据挖掘技术应用中的关键问题,引起了学术界的广泛关注[1-2]。
基于数据约束的数据修复技术通过将数据集修改为满足数据约束的形式,来提高和改进数据质量。本文重点研究基于字典序次序依赖(Lexicographical Order Dependency)的数据修复问题。字典序次序依赖[1-2]定义属性列表间的次序关系,因而在SQL的Order by等优化中取得了良好的效果。要充分发挥字典序次序依赖的作用,数据集必须满足给定的依赖定义。这提示我们需要通过数据修复将存在错误和误差的现实数据修复成为正确的满足约束的形式。然而,面对大量数据的修复问题,已有的集中式修复算法在运行效率方面有所欠缺。特别是在面对电力数据等海量规模的数据时,已有方法难以在可接受的时间内完成修复任务。通过分布式修复技术,我们一方面旨在处理由于单机的存储限制,无法有效处理的大数据集,也希望利用分布式的计算方法,通过利用更多计算资源来提升算法在大数据下的运行效率。
针对海量字典序次序依赖的数据修复问题,本文提出一种基于Spark分布式计算框架的字典次序依赖数据修复算法,为数据质量的提高提供一种高效可行的处理方式。该方法充分利用分布式计算的特点,通过将数据和计算分布到不同计算节点,以提高运行效率。
本文提出的分布式计算在通用的真实数据集上进行实验验证。实验结果显示本文算法可以有效地提高计算效率:实际计算时间消耗与数据规模成亚线性关系;并行度的提高显著地减少了计算时间消耗。
1 相关工作
2 字典序次序依赖的定义与符号
字典序次序依赖是定义在属性列之上,在依赖的左右两侧,不像函数依赖等那样是属性的集合,而是属性的有序列,因此字典序次序依赖性质非常特别。我们首先通过表1中的例子,来引出字典序次序依赖的具体内容。
表1 考虑如下与环球旅行有关的数据表r
根据语义,可以得出如下三个次序依赖:
第一条表示,随着编号的递增,航班的日期也会递增;第二条表示,航班的日期递增,会使得总消费递增;而第三条则是函数依赖“城市决定国家”City→Country的次序依赖表述形式,将函数依赖的左侧属性作为字典序次序依赖的左侧属性列,而将函数依赖的右侧属性加在左侧属性之后,作为次序依赖的右侧属性列,则可以得到由字典序次序依赖表示的函数依赖。
根据这三条字典序次序依赖,可以看出表1中的部分元组并不符合这三条次序依赖。例如第五行的[Month,Day]与其他四条元组都同时违背了OD1和OD2。第三行和第四行的City相同,而Country不同,在OD3上产生了冲突。若将第五行的[Month]改为3,并将第四行的[City]改为BUR,则三条字典序次序依赖都得到满足。
3 字典序次序依赖分布式修复算法
3.1 构建最小单元minUnit
修复过程中修改一个元组中一个属性的值,需要的代价为1。为了保证修复后,左侧属性值相同的元组右侧属性值也相同,可以在左侧属性上构建等价类,即所有在A上相同值的元组,构成一个等价类,并且从小到大可以赋予编号ecno。在同一个等价类中,右侧属性B上的值可能不同。如果有w个元组的B值相同,则在最终修复后,这w个元组的B值要么都被修改为同一个新值,要么不变,前者对应修复代价为w,后者对应修复代价为0。而根据次序依赖的定义,最终需要在A上递增时,B上也要递增。A上的递增可以由ecno来反映,而B就无法简单用编号来衡量或者做标记,因此B上的值需要保留在具体的修复过程之中。算法第一部分,需要将分布式的数据,转变为由相同左右属性值的元组组成的最小单元minUnit的集合,每个minUnit中需要含有三个元素:
(1)value=v:表示这些元组的B值。
(2)value=e:表示这些元组的A值在从小到大排列下的序号。
(3)weight=w:表示这些元组的个数,即修复这部分元组需要的代价。
其计算流程如图1所示。输入:数据表(这里每个元组由两个属性值A和B,用空格分隔);输出:分布式下的minUnit。示例参加见图2。
图1 Spark下disMake_minUnit流程
图2 Spark下disMake_minUnit示例图
Spark的特点是对数据不断进行变化操作,因此伪代码和图1都较抽象,因此这里给出一个示例图。图2给出RDD的数据流的详细变化过程,并对每一步操作作出解释。
(1)-(2):从HDFS中读入元组,并转化成键值对(ai,bi)的形式。
(3):将相同a值的元组进行聚合,转化为(ai,[bi1,bi2,…])。
(4)-(5):根据a值排序,并给出等价类编号,转化为(ai,[bi1,bi2,…],ecnoi),此时ecnoi的大小已经可以代替a的大小顺序。
(6):重新将聚合的bi数组拆开,附上ecnoi的信息,转化为(ecnoi,bij,1),最后的1为了后一步的统计做准备。
(7):对所有相同的(ecno,b)的元组进行聚合,并且更改内部顺序为(b,ecno,weight)的格式,由于这里的b对应的是minUnit的value值,所以此时已经产生了需要的minUnit单元格式(value,ecno,weight)。
(8):最后对所有的minUnit进行排序,先根据value值从小到大,value相同的则根据ecno从小到大。具有相同value和ecno的元组会被汇聚在一起,不会存在两个minUnit的value和ecno均相同,minUnit的顺序固定且唯一。
至此,minUnit分布式存储于各个Partitions之中等待后续使用。
3.2 构造不动单元
算法的第二步是在构建的minUnit中找到若干个“可靠的”数据,以方便在第三步中根据不动单元进行数据分割,在分割后的每一段数据上进行修复。“可靠的”数据是指“极有可能未被污染的干净数据”,因此有以下特征:
(1) 所有不动单元上的value随着ecno单调递增,即满足次序依赖。
(2) 每一个不动单元的权重尽可能大。
(3) 不动单元在整个数据集上的分布尽可能平均。
集中式算法可以采用动态规划,而在分布式启发式算法中,我们采用递归算法和二分算法进行替代。
计算流程如图3所示。输入:分成p个部分的minUnit,均衡系数α,筛选层数f,value最小值ub,value最大值lb;输出:不动单元序列L。
图3 Spark下disMake_fixedUnit流程
(2) 在每一个需要进行筛选的Partition之中,挑选value在最小值和最大值之间,并且具有最大weight的minUnit作为这个部分的最佳minUnit。如果有多个minUnit都有最大weight,则随机挑选一个;如果没有minUnit的value符合上述条件,则可以无候选minUnit。
(3) 将分布式下各个部分得到的候选不动单元收集到主节点。
(4) 模仿(2)中的算法,在收集后的候选不动单元中,找到weight最大的,作为此次算法找到的不动单元FixedUnit,其value值为vf;在多次递归之后,可能会出现不存在不动单元的情况,此时说明所有的数据都需要进行调整,而调整的值可以由ub或者lb确定,因此可直接返回。
(5) 将原先的分布式minUnit分成两部分,排在FixedUnit之前的作为upper_minUnit,而排在FixedUnit之后的作为lower_minUnit,两部分的Partition数量可以适当减少。对于两部分拆开的minUnit,分别递归执行disMake_fixedUnit算法,算法参数为:对于upper_minUnit,其value最小值为ub,value最大值为vf,筛选层数为f-1;对于lower_minUnit,其value最小值为vf,value最大值为lb,筛选层数为f-1。两部分同时进行并行操作。
(6) 在两部分分别得到不动点序列后,将两部分序列和选出的FixedUnit有序合并,得到不动点序列,进行返回。
算法初始输入中ub和lb为负无穷大和正无穷大,只要保证在B列的值域两侧即可。初始α可以决定不动点的分布,α偏大,则不动点更集中于中间,时间上会得到优化,但是有可能会使得原本最佳的不动点被直接过滤;α偏小,则二分递归时,时间可能不平均。初始的f决定了最后不动点的数量,由于运行到后期时,可能得不到不动点,所以最终程序返回的不动点序列,其数量小于2f。
3.3 根据不动单元修复数据
在得到n个不动单元之后,可以对数据进行重新分组,根据ecno在不动单元之间的位置,将所有的minUnit分成n+1组,除了首尾两组,每一组均存在于两个不动单元之间。根据不动单元给出的上下界和文献[5]中给出的集中式修复算法,对minUnit进行修复;而对于第一组数据,没有下界,可视作下界为负无穷;对于最后一组数据,没有上界,可视作上界为正无穷。算法1给出修复部分的算法。
算法1OD修复DisFix
输入:不动点列表,分布式minUnit。
输出:修复后的minUnit。
1. Repartition minUnits according to FixedUnit
2. //parallelly,ineach partition
3. rep←α NULL List
4. for mU∈mniUnits do
5. if (mU value 6. mU value←lowerbound 7. remove mU and add mU into rep 8. else if (mU value>upperbound) then 9. mU value←upperbound 10. remove mU and add mU into rep 11. use Fix to repair mniUnits 12. Return rep+mniUnits 算法1主要侧重的是后半部分的修复,而Spark的RDD数据流操作只在前两行出现,后续的步骤都是并行运行于每一个Partition之中,因此这里给出伪代码。第1行minUnit进行重新分组,之后的第3至12行在每一个Partition运行。首先建立一个空列表(第3行),用于存放数值超越上下界的minUnit,对于这部分,直接修改成对应的值即算修复完毕,从原列表移动至rep之中(第5至10行)。这里要注意,重新分组后分成的n+1组数据中,除了首尾两组,其余数据都有上下界,而对于第一组数据,没有下界,可直接跳过第5至7行;对于最后一组数据,没有上界,可直接跳过第8至10行。对于列表中剩下的数据,值处于上下界之间,因此在该节点之中,可以视作为单机的od修复,因此可以使用文献[5]中的修复算法。 实验使用的是一个真实数据集,关于美国航班情况的数据集,这个数据集也在文献[2,5]中被使用。本次实验使用的分布式环境为Spark,版本为2.0.0,主要启动参数见表2。一共进行了三组实验,分别探讨时间与元组数目、分块数目的关系,以及均衡系数α对于整个修复算法的影响。默认实验条件为,元组数目20万行,属性4列,在实验前,对5%的数据添加误差,观察最终的修复情况;而在分布式的参数中,默认分块数目p=10,均衡系数α=0.2。由于分布式下的修复算法并没有其他替代算法,因此只是对自身的实验结果进行分析。 表2 Spark启动参数表 实验1探究的是,不同元组数目下,算法需要的时间。元组数目从10万行增加至40万行时,需要的时间变化情况如图4所示。由于数据量非常大,并且分布式下的算法为启发式算法,所以无法用文献[5]中的集中式算法进行时间和效果上的比较。 图4 实验1的实验结果 可以看出,元组数目增加会导致总时间的增加,但是增加的时间并非线性增加,而是获得了亚线性效果。主要原因有以下几点:首先算法复杂度理论上是线性,计算时间随着元组数目增加,线性增加。但是随着实际数据集的变大,在构建等价类的这一步之中,可能会随着重复数据的增加,改变了每一个minUnit的权重,而对minUnit的数目并没有产生太大的影响,由此造成整体的曲线偏离线性关系。另一方面,当进入算法的第三部分后,需要依赖于文献[5]中的Fix算法,这个算法复杂度是O(nlogn),不过在此之前,有部分的minUnit会因为处在这一个分块的数据范围之外而被直接修改,因此实际参与到Fix的数据个数也有限。如果对于一些数据,虽然在初始时增加了误差,但因为加了误差之后还是没有违背次序依赖,也就不需要被修复,因而总时间下降。 实验2探究的是在选择不同的并行分块个数时,需要的时间变化情况。分块数目从5增加至40,时间与分块数目之间的关系如图5所示。 图5 实验2的实验结果 可以看出,分块数目增加,计算时间减少,基本呈线性下降的关系。这种线性关系是当前大数据分布式计算框架的重要特征,因此说明本分布式算法的设计是合理的。从并行的角度来说,并行数目的增加,可以导致每个机器上的运算量减少,并且数据分配时,数据可以分配得更均匀。而在本文算法中,并行数目的增加,首先使得算法第一部分minUnit的生成速度加快,另一方面,第三部分的数据修复需要和分块数目同样大小的不动单元,而不动单元的数目在第二部分中会随着递归层数的增加指数级上升,因此当并行数目上升时,只需在第二部分中增加很少的递归深度,便能够满足第三部分的不动单元的数目要求。显然,计算时间并不会随着并行数目的增加无限制地减少,甚至可以根据并行算法的时间来猜测,继续增加并行数目,甚至有可能使得并行时间不降反升。主要原因在于忽略了一些基础时间并不会完全被并行算法分担,例如算法中,主节点上的不动点的筛选;同时,并行数目的增加会使得中间数据转移过程中的数据量增加。 实验3研究均衡系数α对于算法时间和修复质量的影响。实验3分成两个部分,当α从0增加至0.4时,测试整个算法的时间;另一方面,由于实验前在数据中增加了一定的误差,因此统计在修复算法结束以后,被修改的单元格数目,作为整个算法的修复质量。数据共有20万行,4个属性,而有5%的数据被加入了误差,因此理论上的错误单元格数量为40 000。但要注意的是,这里的错误数据可能被加入到了次序依赖的左侧属性列之中,而修复算法只是修改右侧的数据,使得修复后的结果满足给出的次序依赖。图6为实验3的结果。 (a) α与时间的关系 从图6(a)中可以看到,随着α的变大,整体的修复时间是变小的。当α较小时,α的增加会大幅缩减算法时间。这与算法中的相关分析是相符的。设计均衡系数α的初衷,便是使得在二分确定不动单元时,尽可能使得不动单元靠近整体数据的中间,一方面能够使得下一部的二分算法的两个部分数据规模相近,另一方面也是使得算法中,在每个子机器中使用Fix算法修复数据时,各个子机器分得的数据规模相近。在分布式中,并行时间取决于自节点的并行时间最大值,因而合理分配各个子机器的数据量是非常重要的。在增加α的同时,可能会造成数据修复质量的下降,因为α的增大有可能会造成可靠的不动单元由于处于数据的两端而直接被筛去,因此需要考虑α对于修复质量的影响,探讨α的增大是否会增加被修改的单元格数目。不过实际情况下,从图6(b)中可以看出,α的增加并没有使得更改的单元格数目大幅上升,而是几乎保持不动。由此可以推断,虽然α增加,但是每一次选择的不动点单元依旧保持着比较高的质量,没有被污染。从这里可以体现出,算法对于数据误差的高容纳性,并且修复效果很好。另外注意到虽然有40 000个单元格被污染,但实际的单元格修改数目大致为47 016,超过最小修改值约17%。这样的结果是可以被接受的,因为算法是针对右侧属性列进行修改,而如果某条元组的次序依赖左侧属性值被误差干扰,则可能会需要修改多条元组的右侧属性值来消去此误差对于次序依赖的影响。 为了提高数据质量中字典序次序依赖修复计算的效率,本文基于SPARK分布式计算框架提出新的字典序次序依赖分布式修复算法,通过数据和计算的有效分布,算法有效改善了大数据集上的运行效率。文章通过实验验证了本文方法的有效性,并说明了算法参数的意义和效果。4 实 验
4.1 实验数据
4.2 时间与元组数目
4.3 时间与分块数目的关系
4.4 均衡系数的影响
5 结 语