Spark框架下地震属性处理方法研究
2018-08-29朱丽萍王建东李洪奇赵艳红
朱丽萍 王建东 李洪奇 赵艳红
(1.中国石油大学地球物理与信息工程学院 北京 102249)
(2.中国石油大学(北京)石油数据挖掘北京市重点实验室 北京 102249)
1 引言
地震属性是指由叠前或叠后地震数据经过一系列数学变换而获得的描述地震波的几何学、运动学、动力学或统计学特征,它能够在不同程度上反映地层地质的信息[1]。层曲率地震属性为地震属性的一种,它描述了地震反射面的几何变化,曲率属性需要计算二阶导数,而这个过程对原始地震数据中的任何噪声信息都十分敏感,因此通常在计算曲率属性前会进行滤波,常见的有中值滤波、加权平均滤波、高斯迭代滤波等[2]。随着信息技术的不断发展和地震属性研究的不断进步,三维地震勘探已得到广泛普及和应用,这使得地震属性数据量飞速增长,针对地震属性数据进行的分析变换也变得越来越复杂,而目前单机的存储和处理能力已经无法满足大批量的地震属性数据分析计算的要求,针对此问题,目前比较常见的解决方案是使用分布式文件系统进行存储,然后基于GPU、MPI、MapRe⁃duce等分布式并行计算框架进行分析计算。随着GPU的可编程性的不断增强,GPU在并行计算方面的优势越发突出,GPU在浮点运算等方面的性能远超 CPU,MPI(Message Passing Interface)是一种基于消息传递的并行编程技术[3]。本文主要对比了Hadoop MapReduce与Spark框架的计算原理,并使用两种框架分别对曲率地震属性进行处理比较,提出一种基于Spark框架的地震属性处理方法。
MapReduce作为目前应用广泛的并行编程架构,借助函数式编程的思想将并行的实现切分为Map和Reduce两个阶段,Map阶段完成键值对的映射,Reduce阶段完成根据键值得规约,编程人员可以自定义自己的map和reduce函数以实现具体的功能。MapReduce作为Hadoop的上层处理架构已经被广泛使用于海量数据的分析处理中,同时Ha⁃doop底层的分布式文件系统HDFS以其高容错性和高吞吐率为上层的并行处理提供了保障。
Spark是一个类似Hadoop MapReduce的通用的并行计算框架,Spark基于map与reduce的算法使得它拥有Hadoop MapReduce的优点,但其中间结果常驻内存的设计相比Hadoop MapReduce读写HDFS使得其在迭代计算和机器学习等方面的性能远超后者。Spark中的核心概念是分布式弹性数据集(RDD),它是一种分布式的内存抽象,使得大量的计算过程都在内存中完成同时又保证了MapRe⁃duce等数据流模型的容错性[4]。MapReduce由于Disk—IO和高冗余性等使得其在迭代式计算和交互式数据挖掘方面的性能都逊于Spark。而地震属性一方面由于属性体数据量较大对单机的存储和处理产生了负担,另一方面在计算曲率属性体的过程中会涉及到多次迭代滤波过程,传统的单机受到处理核心的限制对一个50G左右的地震属性体进行三次迭代高斯滤波大约需要数十个小时的时间,这导致了整个处理过程的时间成本太高。
本文提出了一种基于Spark的地震属性数据分析处理的方法。通过将GB级别的地震属性体存储在分布式文件系统HDFS中,然后利用Spark计算框架将HDFS中的地震属性数据以RDD的形式读入内存,经过多次迭代高斯滤波处理并按特定的曲率属性数据格式输出,大大加快了曲率地震属性计算的过程。实验证明随着数据量的上升并行处理框架的效率远高于单机处理过程,同时随着迭代次数的增大,Spark在地震属性数据迭代计算上的效率也明显快于Hadoop MapReduce。
2 地震属性数据的格式及存储
地震属性数据通常是以SEGY格式存储的,一个完整的地震属性体由文件头和地震道数据组成。文件头的前3200字节使用EBCDIC编码的记录了地震数据采集系统的信息,紧接着的400字节记录了整个地震体的道数,每一道的采样间隔等。地震道数据是由道头和道体两部分数据组成,道头共240字节,记录了这一道的坐标和采样点数等,道体则使用32位IBM浮点数记录了这一道的具体地震属性数据。每个地震属性体因其范围的不同会包含若干道数据,常见的一个三维地整体会包含上千道数据,所以一个地震属性体的数据量就可能高达数十GB,而单机地震属性数据计算是通过串行或者多线程并行方式一道道的读取数据然后做相应的数学变换而得到的,这对于大的地震属性体来讲将是个非常耗时的过程。
面对如此大数据量的处理任务,光有并行的处理框架是不够的,还需要一个支持其运行的分布式文件系统。HDFS是Hadoop的分布式文件系统,它采用主从节点和数据分块的思想可以用来存储TB级别的数据以满足上层的并行处理的需求,HDFS由一个NameNode节点和多个DataNode节点组成,NameNode负责管理文件的元数据和处理对客户端的文件访问请求,DataNode负责存储具体数据块[5]。HDFS使用文件切块的思想将一个大的文件切分为固定大小的多个小文件块并采用冗余备份的方式存储在多个DataNode中。HDFS以其高吞吐量、高容错性而被广泛适用于大批量数据的存储,但HDFS不适合处理小文件或是处理的数据来自多个文件,这是因为大量的开销用于处理网络通信传输或是文件读写而造成整个集群效率低下。大部分针对HDFS的使用都是在特定的业务背景下改变数据在文件中存储格式以便于被上层的并行框架处理。文献[6]通过专辑、作者、年代等信息将相关MP3文件合并到Sequence File文件中,提高了文件访问效率;文献[7]通过把距离接近的Map⁃GIS K9文件存放到MapFile中来提高查询效率。以上研究均对应着特定的应用,不适合用来存储地震属性数据。
3 Spark框架与地震属性分析技术
3.1 Spark与Hadoop MapReduce计算模式对比
随着信息技术的发展,地震数据的采集量飞速上升,而基于单机的地震数据的分析工作也变得越发困难,分布式并行处理框架的出现很好地解决了这个问题。目前较为成熟的有Hadoop的MapRe⁃duce和基于RDD的Spark框架,两种框架都可以用于大批量地震数据集的处理与分析,然而由于其特性的不同又分别适合于不同的场景下。本文使用Spark作为平台的并行处理引擎是考虑到数据量和计算类型的综合结果,同时实验也证明了在计算曲率地震属性时基于Spark的处理分析性能要比Ha⁃doop MapReduce高出接近一个数量级。
Hadoop MapReduce是将所要进行的计算任务分解为map和reduce两个阶段来完成,map阶段完成数据到键值对的映射处理,reduce阶段则完成数据的规约整合,用户可以在对应的map和reduce函数中实现自己的业务处理过程。在map阶段存在内存缓冲区到硬盘的溢写过程,这个过程主要是将map的处理结果根据key值写入将要处理的不同reduce分区中去[8],而其是在map阶段的结束时会进行一次节点内部数据的合并,这个叫做combine的过程主要完成对单个节点上map阶段输出结果的一个合并整理以减少map与reduce阶段之间数据的网络通信传输量和和磁盘的写入量,然而对于需要频繁的迭代计算的处理类型,这种中间结果写入磁盘的模式必将带来大量的IO消耗,这些消耗相对于整个任务处理过程将是不容忽视的一部分。
相比与Hadoop MapReduce,Spark处理引擎很好的解决了频繁磁盘IO的问题,Spark可以运行在Hadoop YARN上,也可以配置在Mesos上,支持从HDFS数据源读入数据并行处理。Spark使用了分布式弹性数据集(RDD)的概念使得整个数据处理过程都在内存中进行,Spark突破了map和reduce两种操作的局限,提供了多种多样的API供数据的转化和处理,而数据与数据间的转化也是以RDD的形式变换的,这就保证了每一步得到数据的并行性,另外RDD常驻内存可以避免大量的磁盘IO,特别适合需要频繁迭代的计算任务,图1展示了Spark和Hadoop在迭代计算时的区别。同时Spark的执行过程使用有向无环图(DAG图)将数据间的变换过程记录下来,数据间的转化都属于transfor⁃mation操作,DAG图在逻辑上会得到一个输出结果,当用户需要将结果输出时,便会激活Spark的action操作,此时Spark会根据RDD间的依赖生成DAG图,然后依次完成计算过程,并输出最终的结果,Spark执行任务时可以根据DAG图将任务划分为多个任务集即Stage,真正提交到计算节点计算的实际上是不同的 Stage[9~10]。强大的内存计算能力和避免不必要的反序列化操作使得Spark在迭代式计算时比Hadoop的性能高出一个数量级。
图1 Hadoop与Spark迭代计算过程对比
3.2 曲率属性二维高斯迭代滤波算法
曲率地震属性是地震属性的一种,它记录了地震反射面的几何变化过程,曲率是描述曲线上任意一点弯曲程度的量,曲率越大则表明曲线的弯曲程度越大,二维的空间曲率可以看作是平面切割层面得到,由于在求取曲率过程中涉及到二阶导数,而求二阶导数的过程对噪声的敏感性较高,所以通常会先要对其进行滤波处理,常见的滤波方式包括中值滤波、高斯滤波等,实验中需要对地震属性进行高斯迭代滤波进而再求的曲率地震属性[11~12]。
目前的单机多线程处理计算流程如下:
1)启动读数据线程依次从原始地震体数据中读取每一道道数据。
2)创建线程池,将滤波计算线程加入线程池,依次获取读数据线程得到的到数据并进行二维高斯滤波计算。
3)对上一步滤波得到的结果判断是否满足迭代截止条件,若满足,执行下一步,若不满足,则继续执行滤波过程。
4)对完成迭代滤波的道数据计算曲率属性。
5)将结果按地震属性数据格式写回本地文件系统。
单机运行时的计算过程很大程度上受到了计算机CPU核心个数的限制,每个线程运行在一个CPU核上,如果一个计算机是双核的,也就意味着一个读数据的线程和一个计算的线程就已经使CPU满负荷,最大并发量为1,这使得执行的过程就如同串行一般效率低下。借助Spark并行框架改变曲率属性的处理流程,通过集群多节点的并行能力真正提升计算性能,改进后的算法步骤如下所示:
1)预处理地震数据,读取原始地震体中固定字节的道数据并生成对应的文件块头,重复此过程直至地震数据读取完,保证上传到HDFS上的文件块的完整性及其能够被并行框架所处理。
2)读入HDFS中的地震数据并通过标准输入输出类进行转化,将得到的数据通过SparkAPI转化为SparkRDD并持久化到内存。
3)对上一步生成的键值对Tuple的值即地震道数据进行二维高斯滤波,二维高斯滤波过程封装为一个函数写在另外的文件中。
4)比较滤波结果与是否满足迭代截止条件,如果满足,则结束迭代过程,执行下一步,若不满足,则转3)继续执行。
5)根据滤波后的地震属性以Tuple为单位计算曲率属性,同样曲率属性的计算过程也封装在函数中。
6)计算结束后,根据道头(key)值进行reduce操作后写回HDFS。
算法的迭代截止条件可以取固定的迭代次数,也可以选择滤波使噪声减少到某个阈值范围内,而算法的收敛速度取决于设定的收敛条件。改进后的处理算法分散了单节点上的存储和处理压力,同时迭代的数据常驻内存减少了大量的磁盘IO,极大地提高了处理效率[13~16]。
4 Spark计算处理过程
4.1 地震属性预处理
由于地震属性有着特定的文件格式,文件头数据和道数据集中在同一个大的地震体文件中,如果直接将整个地震属性文件上传至HDFS并由HDFS完成对数据的默认分块切割必将导致数据完整性的破坏而无法处理。因此需要在上传前对文件进行预处理,从而保证分块后得到的每一个数据块都有一致且完整的结构。预处理的过程如图2所示,读取固定字节的地震属性数据并据此生成固定格式的文件头,将这两块组合起来形成一个大小不超过HDFS块大小的文件块,数据部分存储实际地震道的地震数据,文件头存储这块数据的坐标范围、地震道数以及采样间隔等。这样处理后的文件块有着局部的数据完整性并可以保证不被HDFS所切割,同时也可保证上层的并行处理框架在读数据时的一致性而大大减少由于处理的数据跨文件块而产生的网络通信开销。在预处理的过程中应保证生成的文件块大小接近HDFS设置的块大小,因为如果生成的文件过小会导致处理过程NameNode的内存占用较高而影响集群处理性能。
图2 地震属性文件结构预处理
4.2 地震属性读入与RDD变换
上传到HDFS上的地震属性有着固定的格式,所以无法直接使用Spark读入,这里借助Hadoop的InputFormat类和 RecordReader类,InputFormat类实现了数据的分割,重写其getSplit函数可以得到用户自定义的数据分片InputSplit,RecordReader类负责处理得到的每个分片,RecordReader处理之初会分析数据片头的信息,其中记录着这片数据的坐标范围和深度等,如果提交的处理任务不在此范围内,则直接跳过此分片转而处理下一分片。在Re⁃cordReader中通过重写nextKeyValue函数将分片中的数据转化为一条条键值对数据。在实际实验中,最后得到的每一条键值对的数据都是以一个数据块内横坐标相同的多个地震道的道头数据为key,实际地震数据为value的键值对。之所以选择横坐标相同的多道地震属性来构成一组键值对,是因为保证横坐标相同,纵坐标在纵向方向上变化便会形成一个地震剖面,随后进行二维高斯滤波中会使用高斯函数对这组剖面数据区域做迭代滤波处理,滤波的窗口大小取决于高斯核的宽度,即式(5)中的σ。
在得到Spark能使用的标准输入后,使用Ja⁃vaPairRDD读入InputFomat得到的标准输入,至此我们的地震数据就变换为了内存中的RDD,并行部分的迭代计算过程如图3所示,在Spark的map函数中完成对数据的二维高斯滤波,滤波函数编写在另外的文件中,通过调用其API完成滤波过程,经过滤波后的数据还是一个Tuple类型的RDD,迭代滤波的过程如下:
在此使用固定窗口大小的地震数据建立噪声标记矩阵Ai,假设噪声点的个数为Bi,整个噪声检测的过程是以窗口为单位的,完成第一次迭代的高斯滤波过程,然后再使用窗口内数据重新建立新的噪声标记矩阵Ai+1,此时的噪声点个数为Bi+1,实验中当两次迭代间的噪声点的个数减少率少于5%,即(Bi-Bi+1)Bi<0.05时迭代结束,整个迭代滤波的过程数据都是以RDD的形式在内存中进行计算完成,中间不存在磁盘的读写过程,当迭代完成后再根据所要求的曲率属性类型计算目标曲率属性值。
图3 使用Spark框架的并行迭代过程
图4 为整个计算的流程图,当计算完曲率属性之后,由于key值是横坐标相同的多道地震道的道头信息,所以此key值对应的数据也是唯一的,因此reduce操作的过程只需完成key值排序,通过标准的OutputFormat类将结果数据转化为标准的地震属性数据输出格式并使用Spark的action操作将曲率属性持久化到磁盘。
图4 使用Spark框架提取曲率属性流程图
5 实验分析
5.1 实验软硬件环境
Hadoop与Spark架设在同一个集群环境中,集群使用单节点主频为2.8GHz的Intel I5 6400四核处理器,8G物理内存,800GB SATA硬盘的多台PC机组成,使用64位CentOS7(内核版本为3.10.0)作为操作系统,Hadoop版本是2.6.0,Spark版本为1.2.4。
5.2 实验设计与数据
实验从纵向和横向两个方面比较了使用Spark计算框架后的二维高斯迭代滤波曲率地震属性抽取效率,纵向上比较了单机执行、Hadoop集群以及Spark集群三种处理模式随着数据量的增长计算性能变化情况。横向上对比了基于相同数据量的处理任务Spark集群随着节点个数的变化对计算性能的影响。另外,为了验证Spark在进行迭代滤波方面的优势,通过改变迭代截止条件来观察迭代次数的变化对Spark和Hadoop集群执行效率的影响。实验使用某油田A区块的地震属性作为数据源,共计1765590道(范围为1374×1285)地震数据,每道包含1001个采样点。
5.3 实验结果及分析
实验首先测试了随着数据量的变化单机多线程、Hadoop集群与Spark集群三种计算模式的处理性能差异,两种集群计算模式均使用5个节点。从图5所示的对比结果中可以看出,与单机运算相比,在处理的数据量在10万道~40万道时集群计算模式并未展示出太大优势,甚至存在集群的计算时耗多于单机运行,这是因为数据量不够大,对于Hadoop和Spark集群在做任务启动的过程就会占用相当一部分时间,但当数据量上升至80万道乃至更多的时候,集群的任务启动在整个处理过程中所占的比重就很小了,这时集群的计算性能远远超出了单机节点的计算能力,同时,随着处理数据量的增大,Spark集群的计算能力明显优于Hadoop集群,最多能快出约一个数量级。
图5 三种计算模式计算时间随数据量变化图
保持处理的数据量不变,改变集群中节点个数,观察并记录计算任务所需时间得到如图6所示结果,从图中可以看出,随着节点个数的增大,对于相同任务量的处理任务计算时间减少速度下降较快,随着节点个数再增加,计算时间减少速度趋缓。另外,在集群节点个数相同时,数据量越大,集群计算性能越高。
图6 Spark集群计算时间与节点个数关系图
为进一步验证滤波迭代波过程中迭代次数的不同对Spark和Hadoop集群中计算性能的影响,改变二维高斯迭代滤波的截止条件,并统计不同截止条件下的迭代次数,观察在不同迭代次数时各种计算模式的性能表现。实验中分别采用迭代滤波的截止条件为两次滤波噪声点个数的减少率小于50%、20%、10%、5%、2%,统计四种不同的截止条件对应的迭代次数分别为2次、5次、11次、18次、34次,集群环境使用5个节点、数据量为170万道,实验结果如图7所示。从图中可以看出,在迭代次数较小的时候,三种模式的计算时间相差并不明显,但随着迭代次数的增加,计算的任务量变大,集群环境相比单机环境的计算性能开始显现,同时Spark集群相比Hadoop集群在迭代计算方面的优势表现的越发明显,当迭代次数达到34次的时候,Spark集群的计算性能约为Hadoop集群的10倍。
图7 三种计算模式计算时间随迭代次数变化图
在实验的过程中,三种计算模型的任何一种在第一次迭代时都需将数据读入内存,另外对于基于分布式计算的Hadoop和Spark两种框架,任务执行之初需要对数据进行预处理,这些也都属于数据任务执行中的时间消耗。而在将数据读入内存的过程中,也可能存在一些IO繁忙或者CPU占用率高等随机事件,所以实验采用了多次计算后的平均时间作为标准以尽量减少随机误差的影响。总的来看,在对地震属性数据做迭代滤波的计算方面,Spark的处理性能相比与Hadoop MapReduce确实有着相当大的优势。
6 结语
在面对需要进行迭代计算或分析的地震属性处理任务时,单机的处理引擎由于存储和处理能力的局限经常无法满足要求,Hadoop MapReduce在处理迭代计算时又会浪费大量的时间在磁盘IO上,导致整体性能低下。本文提出了基于Spark框架的地震属性数据分析方法,以HDFS为数据源,针对曲率地震属性进行二维高斯迭代滤波,实验证明与单机计算和Hadoop计算相比,Spark框架的整体性能更为优越,在处理需要进行迭代式计算的地震属性数据时更加适用。