基于MapReduce的气候数据的分析的设计与实现
2020-08-21刘兆丰
刘兆丰
摘要:在社会的发展过程中,天气对社会中的很多行业都有一定的影响。在当前阶段,天气预报作为全球主要对天气进行预测的手段,在这种情况下,全球的气候行业也产生了大量的数据(PB级别),并且数据每年还在持续的增长。为了解决使用传统的关系型数据库很难存储与分析的问题,需要采用大数据的相关技术对气候数据进行分析。
本设计采用MapReduce生态圈的一系列的工具,通过使用虚拟机的平台,采用爬虫软件进行数据爬取的方式,用Map阶段和Reduce阶段的数据分析,分布式系统的数据存储的方法,实现气象数据的采集分析,并且将其展现出来的方式展示。使用这系列流程的系统可以辅助有关部门或企业进行决策部署。
关键词:MapReduce;气候数据;大数据;计算机
中图分类号:P41文献标识码:A文章编号:1672-9129(2020)03-0047-03
Abstract:Intheprocessofsocialdevelopment,weatherhasacertainimpactonmanyindustriesinthesociety.Atthecurrentstage,weatherforecastingisthemainglobalweatherforecastingtool,andinthiscontext,theglobalclimateindustryalsoproducesalargeamountofdata(petabytes),andthedatacontinuestogroweveryyear.Inordertosolvetheproblemthatusingtraditionalrelationaldatabaseisdifficulttostoreandanalyze,itisnecessarytousebigdata-relatedtechnologiestoanalyzeclimatedata.
ThisdesignadoptsaseriesoftoolsofMapReduceecosystem.Throughtheuseofvirtualmachineplatform,crawlersoftwareisadoptedfordatacrawling,dataanalysisinMapstageandReducestage,anddatastorageindistributedsystem,meteorologicaldatacollectionandanalysisarerealizedanddisplayed.Systemsthatusethisseriesofprocessescanassistdepartmentsororganizationsintheirdecisionmakinganddeployment.
Keywords:MapReduce;Climaticdata;Bigdata;Thecomputer
1数据采集
1.1在线API数据获取。此模块的实现分为两大部分:在线API数据获取与数据分析。这里通过在特定网站提取天气数据:
StringbaiduUrl="http://api.map.baidu.com/telematics/v3/weather?";
publicstaticvoidmain(String[]args)throwsIOException{
URLurl=newURL(baiduUrl);
URLConnectionconn=url.openConnection();
while(reader.readLine()!=null){
StringBufferstr=newStringBuffer();
str.append(url+"");
这里此设计将使用百度天气作为提取的天气数据的来源。通过链接的方式,天气网页url转变成字符流,通过粘贴的方式将所获取的数据写入进stringBuffer中。
1.2数据处理。
经过第一步需要处理的数据,称之为元数据。因此将会有一些无用的部分。将所需数据进行提取,而此方法提取的数据为JSON格式,通过使用下面的方法獲得信息。
1.3JSON格式介绍:
JSON:一种与开发语言无关的轻量级数据存储格式。全名是JavaScriptObjectNotation,它是数据格式的标准规范,最初是从JavaScript语言派生的,该语言具有用于JSON处理的API,Json由对象,数组,字符串组成。
1.4代码程序实现。
初始化百度天气url,Stringuri=showURI(k)。提取其中的cell属性。Cellcell=
value.getColumnLatestCell(Bytes.toBytes("page"),Bytes.toBytes("i"))。
通过一个数组配对其中的键值对:
Arrays.copyOfRange(cell.getValueArray(),
cell.getValueOffset(),
cell.getValueArray().length);
Stringflag=Bytes.toString(b);
1.5这里使用writable接口格式做大数据数据类型改写,writable接口简介;
Writable接口,是根据DataInput和DataOutput实现的简单、有效的序列化对象。MR的任意Key和Value必须实现Writable接口.简单来说要实现MapReduce的序列化,所需要的数据必须有一定的格式,因此需要通过改变数据的键值对来实现此接口方便我们接下来转换数据。
2集群搭建
在这里选择3.0.3的hadoop。此处使用linux命令
(1)解压文件到/opt下
sudotar-zxvfjdk-8u91-linux-x64.tar.gz-C/opt
sudotar-xvfhadoop-3.0.3.tar-C/opt
(2)建立软连接
cd/opt
sudoln-shadoop-3.0.3hadoop
sudoln-sjdk1.8.0_91jdk
(3)设置环境变量
cd进入目录
sudovi~/.profile或者sudovi~/.bashrc
(4)进入hadoop的安装目录配置文件中
cd/opt/hadoop/etc/hadoop
假设没有找到core-site.xml,找到core-site.xml.template
sudocpcore-siter.xml.templatecore-site.xml
修改core-site.xml
sudovicore-site.xml
内容如下:
(5)修改mapred-site.xml使用sudovimapred-site.xml命令进入此xml文件之中。
(6)确认YARN集群的模式,如图1.1所示:
(7)MapReduce的具体内容实现地址,如图1.2所示:
(8)MapReduce内map函数的具体内容实现地址,如图1.3所示:
(9)MapReduce内reduce函数的具体内容实现的地址,如图1.4所示:
(10)修改yarn-site.xml
修改YARN集群IP地址,如图1.5所示:
3集群设置
3.1集群权限
(1)hdfs目录下的所属子目录所属用户hdfs,所属组hadoop及其子文件也设置
sudochown-Rhdfshdfs;sudochgrp-Rhadoophdfs;设置权限保证创建文件时系统不会组织,linux的系统由于其特殊的权限特性保证了其安全性。
(2)yarn的所属组hadoop及其子文件
sudochown-Ryarnyarn;sudochgrp-Rhadoopyarn;同理yarn集群也需要相同的操作。
(3)进入/data目录,给hadoop目录及所有子目录或文件赋予权限777sudochmod777-Rhadoop,这里在创建文件夹带有最高权限可以方便查询日志及创建文件夹。
(4)建立yarn和hdfs执行所用到的目录sudomkdir/data;这里是数据落地的地方。
(5)在data目录下建立hadoop文件sudomkdirhadoop子目录用来分批量存储数据
(6)在hadoop中建立hdfs和yarn文件夹sudomkdirhdfsyarn;这里是最终落地的目录。
(7)在hdfs文件下建立的dn,nn,snn文件夹sudomkdirdnnnsnn;三个文件夹存储不同种类的数据。
(8)在yarn下建立logs,nm文件夹sudomkdirlogsnm;这里存储日志文件和日志采集。
3.2配置一键启动功能
一键启动是在主节点一个命令启动HDFS和YARN集群。
(1)主节点安装ssh服务。
(2)解决无密码登陆的问题在主节点之中生成铭文和密文,的两个属性需要在hdfs用户下和yarn用户下各执行一次。
ssh-keygen-trsa-P''-f~/.ssh/id_rsa
(3)将铭文拷贝到从节点的需要无密码登陆的机器上去。
使用命令:ssh-copy-id-i:192.168.43.204
3.3验证无密码登录
(1)使用命令:ssh-copy-id-ilocalhost,ssh192.168.43.204。
(2)Hbase的安装与配置
①重命名目录文件夹
②配置Hbase的环境变量
这里更改path和添加HbaseHome作为Hbase的路径和程序地址。
(3)修改配置文件hbase-env.sh。
4修改配置
(1)在这里修改hbase.Rootdirhbase的根目录需要在这里被固定下来。
(2)修改hbase.Unsafe.stream.capability.enforce這里安全空间是否需要被开启,选择不开启,可以充分利用内存。
(3)修改cluster的权限:这里选择分组确认,选项设置为true。改变分组可以提高zookeeper的数据采集能力。
(4)修改quorum的属性,这里设置为单个端口。
(5)修改分区端口号的属性,这里用作备份,冗余数据可以通过此端口被查询到,这里设置为16030。
(6)设置主节点的日志级别info的端口号,这里设置为16010。
(7)开启文件保护系统在hbase.wal.provider中选择filesystem。
在出现未知故障的时候,比如突然断电的时候选择此项可以开启zookeeper保护模式,保护已经采集到的日志。
(8)启动hbase,通过jps命令可以查询到两个已经开启的命令。主节点可以查询到HMaster,从节点可以查询到HRegionServer。
5MapReduce的编写
5.1map的模块编写
首先将天气的年份,温度,湿度,风力等值作为权重放入到mapper之中。在这里元数据的键值要改变为需求且对应的键值对。使用substring方法可以将固定权重提取。最后将提取出的数据通过一定的顺序拼接在一起。代码实现如下:
staticclassTMextendsMapper
@Override
protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException
以上是创建一个map函数用来将已经提取好的json格式的数据做成二次筛查,放入到mapper里面进行操作。
Stringtemp=value.toString();
Strings1=temp.substring(0,15);//城市天气
Stringyear=temp.substring(15,19);//年份
Stringtem=temp.substring(87,92);//温度
Stringval=temp.substring(92,93);//【01459】
在这里将所要的数据通过权重比较的方式,分批进入mapper中。
if(!"+9999".equals(tem)&&"01459".contains(val))
context.write(newText("城市编号:"+s1+"年份:"+year),newText(temp));
在这里拼接数据,变成城市编号加年份的方式,分批处理好元数据。
5.2使用reduce方式进行操作
创建reduce程序,使用writable接口和迭代器进行操作
staticclassTRextendsReducer
@Override
protectedvoidreduce(Textkey,Iterable
第一步将年份的数据放到reduce之中
Texttext=key;//年份
使用treemap方法采集數据
Map
将所得数据中“:”的部分后面分割出来提取
String[]year_tem=values.toString().split(":");
下面的逻辑是通过ifelse循环分别拿出编号为0和1的不同年份的天气
if(year_tem.length>0&&year_tem!=null){
if(yt.containsKey(year_tem[0])){
if(yt.get(year_tem[0]) yt.put(year_tem[0],Integer.parseInt(year_tem[1]));} } else{yt.put(year_tem[0],Integer.parseInt(year_tem[1]));} } 同理将上述的程序中[]数组中的序号更改之后就可以得到剩下所需不同元素的天气的值,此处代码相近,省略展示。 5.3构建job作业提交 (1)job提交简介,job的工作就是和集群建立连接,创建一个yarnRunner的对象调用job.Submit的方法提交,然后确定job的状态和这个对象通信。所以总体来说,构建job就是作为连接通道的桥梁。 (2)job提交代码实现,每一个提交的类都有其共同的方法,这里每当提交之后可以将job的方法拷贝到每一个mapper和reducer中去。这里将拷贝到input和output中以便使用。重点的是,这里需要重写其中的run方法并且构建作业的时候要重命名class文件,下面是代码说明。构建一个conf方法的采集器,这里将input和output首先构建出对象。 (3)在job提交中,此设计使用了getInstance,setJarByClass,setJobName,setNumReduceTasks四个方法作为前置提交,这四个方法分别提交数据属性,具体的类,数据具体的名称还有此数据在整个hbase中行键的序号。最后一个属性是用来方便定义这个值。 (4)第二个阶段,这里的mapper和reducer方法需要连同第一阶段的已提交的数据的类一起进行第二步的提交。在这里需要用到setMapperClass以及他一系列的衍生方法。用来将mapper的键值对传递到reducer的键值对之中去。之后,reuducer的键值对会进行再一次传输的任务。 最后生成的reducer程序会使用输入流生成文件,这里会设定input和output输入输出的路径,将生成的文件传输到这个路径中去。在这里可能会有等待的问题出现,在这里由于提前开启了线程,因此将等待改为false,关闭即可。 5.4基于年份进行分组排序 上一步将job提交的文件已经写入到指定路径中,之后将已写入的文件进行分组排序,这里分组排序的方法是使用compareto方法。筛选条件可以使用之前已有的条件。包括但不限于风力,天气等元素进行分组排序。代码展示如下: intn=this.year-o.getYear(); returnn; if(n!=0)returnn; returnthis.stationid.compareTo(o.getStationid()); 5.5检测获取的温度是否合理。 (1)当分组排序之后,有一些问可能会超过正常温度的阈值,上述的一些环节虽然有经过一些列的筛查,但是只用到单独的某个条件进行归类,而在数据可靠性上不能得到保证,因此需要使用特定函数方法检测温度合理性。在这里使用ifelse循环做一个筛查。这里定义一个对象isValiadTemperture意为温度是否合理。 (2)权重值说明: (15,19)年份 (87,92)检查到的温度,如果为+9999则表示没有检测到温度 (92,93)溫度数据质量,为【01459】表示该温度是合理温度 (3)代码如下: publicvoidparser(Stringstr){ if(str.length()<93){isValiadTemperture=false;return;} this.stationid=str.substring(0,15); this.year=Integer.parseInt(str.substring(15,19)); if(str.charAt(87)=='+'){ this.temp=Integer.parseInt(str.substring(88,92)); }else{ this.temp=Integer.parseInt(str.substring(87,92)); } Stringquality=str.substring(92,93); if(temp!=MISS&&quality.matches("[01459]")){ this.isValiadTemperture=true; }else{ this.isValiadTemperture=false; } (4)将温度写入HDFS系统中进入数据库。 Hdfs高吞吐量的特性可以帮助数据快速进入到到数据库之中作为实时存储,通过上一章写的定时分析模块,可以做到一段时间内获取所需要的数据,做到精准分析。而数据库的操作命令相对简单,也可以很好的实现增删改查的功能,实时数据分析,完成天气系统的设计。 参考文献: [1]唐果星.浅析气候大数据在行业中的发展趋势[J].电脑知识与技术,2019,15(10):262-263. [2]胡欣滨.基于大数据环境下的气候数据分析[J].科技创新导报,2013(12).79-79. [3]刘喆玥.我国气候大数据的发展趋势研究[J].电脑知识与技术,2019,34(21). [4]杨巨龙.大数据技术全解:基础、设计、开发与实践[J].中国信息化,2014,000(006):71-71. [5]李天目,韩进.云计算技术架构与实践[M].北京:清华大学出版社,2014. [6]周品.Hadoop云计算实战[M].北京:清华大学出版社,2012. [7]陈强.精通JAVA开发技术:由浅入深领会高效开发之道[M].北京:清华大学出版社,2013. [8]刘志成.Java程序设计案例教程[M].北京:清华大学出版社,2006. [9]王鹏.云计算与大数据技术[M].北京:人民邮电出版社,2014.