APP下载

基于消息的气象数据处理系统运维自动化初探

2022-02-10李从英支亚京张淑莹杨远恒

中低纬山地气象 2022年6期
关键词:处理程序积压队列

李从英,支亚京,张淑莹,李 波,杨远恒

(贵州省气象信息中心,贵州 贵阳 550002)

0 引言

“数算一体”的天擎系统具备海量数据存储、全业务贯通、数据应用高效的能力,可直接支撑天气、气候、探测、公服、人影等各类气象应用的云化融入。数据种类覆盖全国综合气象信息共享平台(China Integrated Meteorological Information Service System,CIMISS)全数据集,数据质量更高,性能大幅提升,具备直接支撑应用云化改造的能力。

天擎系统数据处理在技术和数据种类较CIMISS[1]大幅度提升的同时,保留和继承了CIMISS的大部分标准规范,是对CIMISS的继承和发展。数据处理系统(Data Processing Center,DPC)是天擎的重要环节,所有的数据由CTS(Communications Technology System)收集后,都要经过DPC系统的解码处理,才能及时入库。因此DPC是保障数据及时处理并入库的核心系统,DPC运维工作至关重要。

天擎系统建立快速传输、实时质控和解码处理的一体化流程;根据气象数据的存储和应用特点、传输方式、数据形态设计了9种处理框架。天擎DPC系统主要负责结构化、非结构化以及storm流式等资料数据处理,由18台服务器支撑,框架多,运维复杂度高,因此,探究DPC系统自动化运维意义重大。

1 RabbitMQ在DPC系统中的应用

数据入库在整个天擎系统中主要覆盖消息转发、消息接收、消息解码、存储管理等环节。消息队列(message queue)即消息中间件,它提供的有保障的消息传递、有效的路由、安全性、事务处理支持以及基于优先级的消息传递方式[2],在气象通信系统中有着广泛的应用。RabbitMQ由于开源、支持多种语言的客户端以及适合在分布式系统中消息存储转发的场景等特点,非常符合气象数据传输业务需求[3]。为解决观测资料传输频次和海量数据传输问题,选用RabbitMQ消息队列技术进行数据传输[4]。现有数据处理流程为:CTS[5]通过FTP把数据文件推送到数据交换共享目录,发送通知消息,原始消息发送至RABBIT5671端口,将拆分后的单条消息发送至RABBIT5672端口,后续推送给DPC处理(图1)。

图1 数据处理流程

在实际工作中,数据解码入库往往会存在一些问题,主要有以下2种情况。一是系统本身特征导致的。由于DPC系统处理程序种类多,且部署在不同的服务器上,很多处理程序因为进程掉线、进程假死等原因,造成消息积压、不能及时处理,进而导致数据解码入库异常。二是人员修改消息转发配置导致的。人为修改消息转发配置出错等原因造成上游收到消息后并没有转发到下游的消息队列中。现有传统的监控只能在进程是否掉线等粗线条方面进行告警,以上2种异常情况都不能通过现有监控手段抓取到,只能通过人工巡检检查时发现,而这时已经严重影响到预报和服务等实时业务。鉴于以上现状,我们开展了DPC数据处理系统自动化运维,以此保障数据的正常及时入库,为天气预报和服务业务提供及时的基础数据保障。

2 研究方法

队列(Queue)是RabbitMQ的内部对象,用于存储消息队列,并将它们转发给消费者[6],它是消息的容器,也是消息的终点。消息一直在队列里等待消费者将其取走。根据消息是积压还是长时间没有接收到2种情况,我们需要获取消息的具体数量。由于RabbitMQ的消息内容可以处理成严格的JSON格式数据,然后从中提取关心的字段,以达到监控的目的。本文首次引进了jQuery技术来获取RabbitMQ消息队列相关详细信息,并利用这些信息,可以更好地监控消息队列。

RabbitMQ消息的持久化[7]设置,使得在消息转发服务重启之后,已经存入磁盘的消息也会存在。也就是说,在消息没有被消费者消费的情况下,消息会一直存在。利用消息的持久化,根据具体消息队列messages字段,可以获得消息数量。根据Queue队列的字段next_seq_id没有持久化设置,来判定是否存在长期没有收到消息。

2.1 jQuery技术简介

jQuery[8]是一款在Linux系统操作中轻量级且灵活的处理JSON数据的工具,它可以接受标准输入,命令管道或者文件中的JSON数据,经过一系列的过滤器(filters)和表达式的转换后,形成我们需要的数据结构并标准输出。这种特性使我们能很容易在Shell脚本中调用它,将数据转换成理想格式。

要使用jQuery技术来处理消息,首先需要在Linux服务器上安装jQuery软件,本项目中使用的是jq-1.5版本。由于是在服务器内部使用,可以直接在github上下载,下载后将安装文件包上传服务器解压缩部署即可。如果服务器可以连接外网,可以在部署的Linux服务器上通过命令yum install jq来安装。

2.2 自动化运维的设计与实现

消息中Messages字段是记录消息数量,可以通过以下命令获取消息数量:

curl --silent -u usernm:passwd "http://ip:15672/api/queues/%2f/queuesname"|jq ".messages"

设置messages大于一定数量时,将对应队列的名称记录到日志中,并自动到DPC处理服务器上找到对应的处理程序,将其重新启动。

next_seq_id是记录消息的总数量,消息有持久化设置且不会清零的特征,只要有新的消息来,next_seq_id的值就会增加。利用上述原理,如果next_seq_id在一定时间内没有变化,将会启动报警,提示运维人员检查RabbitMQ没有收到消息的具体原因。可通过下面的命令行获取next_seq_id值:

curl --silent -u usernm:passwd "http://ip:15672/api/queues/%2f/queuesname"|jq ".backing_queue_status.next_seq_id"

其中usernm、passwd分别是安装RabbitMQ时设置的用户名和密码;ip是消息服务器部署地址;2f为队列所在的虚拟主机名;queuesname对应要监控队列的队列名。

2.2.1 消息积压时的处理 消息积压但处理程序没有报错的情况下,说明处理程序虽然显示在线,但是实际并没有工作,程序处于假死状态,现有监控无法检测。因此程序代码设定这种情况下,自动到DPC处理服务器上找到对应的处理程序,将其重新启动。

通过程序判断,当messages大于10时,监控程序会自动到处理程序所在服务器上重新启动处理程序,messages的值可以根据实际需求设置。由于自动处理脚本是对几千个队列的监测,不能在某个处理程序重启之后就停止整个脚本的执行,积压的消息不能迅速处理完,可能会出现多次重新启动同一个程序的情况,因此在脚本中设置重新启动程序后休眠20 s,以给处理程序更多的时间处理积压的消息。当messages值小于10时,等待程序处理消息入库即可。结构化资料积压时,脚本会自动到DPC14和DPC15上去重新启动对应结构化数据解码程序;非结构化资料积压时,脚本会自动到DPC04和DPC05上去重新启动对应非结构化数据解码程序;半结构化资料积压时,脚本会自动到DPC06或DPC07上去重新启动对应半结构化数据解码程序,需要注意的是DPC06和DPC07服务器上处理的是不同资料的半结构化数据;对于一些省内自有资料,本省将解码程序部署在DPC18服务器上。在自动化运维处理过程中,消息积压的队列名称、积压数量以及处理过程都将写入日志,方便对频繁出现问题的资料进行原因分析。程序核心代码protect_messages函数如图2所示。

图2 核心代码

通过以上代码调用protect_messages,并给其传参"AGME_PQC_E.0001.0007.R001_001""DPC14" "DPC15""/space/cmadaas/dpc/CMADAAS_DPC_DECODE/exec""E.0001.0007.R001.sh",再通过定时任务来设定监控频率,或通过命令守护监控程序一直处于工作状态。

其中,“AGME_PQC_E.0001.0007.R001_001”表示待监控的队列名称,“DPC14,DPC15”表示对应处理程序所在的服务器名称或处理队列数据的程序所在服务器名称,“/space/cmadaas/dpc/CMADAAS_DPC_DECODE/exec”表示处理程序所在服务器上的路径,“E.0001.0007.R001.sh”表示处理程序名称。如需要监控多个消息队列,添加多个调用命令语句即可。

2.2.2 长时间没有收到消息的处理 由于长时间没有收到消息的情况,不能通过消息积压或者程序不在线等方式处理,只有人工巡检才能发现,因此目前只能通过告警的方式,第一时间通知值班员人工查找原因,以此减少影响时间。本文通过脚本获取next_seq_id值后,将该机制引入了Zabbix[9-10]的监控[11]告警模块。将获取next_seq_id值的脚本文件部署到相应Linux服务器后,通过在Zabbix调用脚本,设置监控项中abschang()函数,当监控的当前时次消息队列next_seq_id值和上个时次的值之差为零时,就会被抓取告警反馈给运维人员。

3 结论

本文基于消息粒度对数据消息进行监控,在消息积压的情况下,放弃传统只告警的监控这一手段,采用自动重启解码入库程序,保障基础数据及时入库,再将处理流程写入日志,供后期原因分析使用。同时对监测长时间没有收到消息的情况,能及时提醒运维人员,辅助缩短排除故障的时间。实现了DPC处理程序不工作却不能及时发现并直接处理故障,保障数据及时入库,减轻了运维人员的工作压力,提高了工作效率,解决了2种常见故障的监控难点。

猜你喜欢

处理程序积压队列
高速公路工程变更与计量支付处理程序的优化方法
珠三角水产品存塘积压真的大得惊人吗?千万别好心帮倒忙
队列里的小秘密
基于多队列切换的SDN拥塞控制*
波音的烦恼
在队列里
丰田加速驶入自动驾驶队列
基于C++的数控加工通用后处理程序的开发应用研究
企业危机公关管理问题分析
处理房地产纠纷中行政与民事交叉问题的正当程序