APP下载

基于Zookeeper构建准实时索引更新系统及其监控

2020-04-05邓杰童孟军胡文泽林英杰胡燚

计算机时代 2020年2期
关键词:分布式

邓杰 童孟军 胡文泽 林英杰 胡燚

摘  要: Mysql数据库表切换过程,需要保证两个相同表同时写入。Solr作为企业级的搜索框架,其自带的索引更新系统是增量更新,实时性差。为了满足互联网企业对数据的实时性要求,设计和实现了一款基于Zookeeper的能够准实时更新Solr索引和实现数据库双写的系统。用户需要提交配置信息到系统,当Mysql数据源发生内容变更时,系统就能够实时捕获,将变更内容经过数据转化并实时同步更新Solr索引。

关键词: Zookeeper; Solr; Mysql; 实时同步; 分布式

中图分类号:TP392          文献标识码:     文章编号:1006-8228(2020)02-58-04

Quasi–real-time index update system using Zookeeper and its monitoring

Deng Jie, Tong Mengjun, Hu Wenze, Lin Yingjie, Hu Yi

(College of Information Engineering, Zhejiang A&F University, Hangzhou, Zhejiang, 311300, China)

Abstract: Mysql database table switching process, it is needed to ensure that two same tables are written at the same time. As an enterprise-class search framework, Solr's built-in index update system is incremental update with poor real-time performance. In order to meet the real-time requirements of Internet enterprises, a system is designed and implemented by using Zookeeper, which can quasi–real-timely update Solr index and realize database dual write. Users need to submit configuration information to the system. When the content of Mysql data source changes, the system can capture the changed content in real time, and update Solr index synchronously in real time through the data transformation.

Key words: Zookeeper; Solr; Mysql; real-time; synchronization

0 引言

当前互联网企业对数据的实时性要求越来越高。所以本文研究的內容着眼于实时的数据同步,整个系统分为数据抓取分发平台部分和索引更新部分。基于这个系统来解决下面两方面的问题。

⑴ 很多公司会将部分数据冗余存在搜索平台Solr上,当用户请求数据时,直接从搜索平台Solr上获取而不是访问数据库(因为数据库不支持复杂的搜索逻辑)。所以需要一款能够实时更新搜索索引的系统,当数据库内容发生变更时,及时更新对应的Solr上的索引。

⑵ 有些特殊情况需要替换线上运行的数据库mysql内的某个表,但又因为是线上运行的数据库,不应该让业务感知到这种变更,所以需要一个系统进行渐进的替换过程。

1 核心框架和相关技术介绍

1.1 系统运用到的核心框架

zookeeper提供分布式协调服务,提供诸如统一命名服务、配置管理和分布式锁、分布式消息等分布式的基础服务。它是一个典型的分布式数据一致性的解决方案,分布式应用可以基于zookeeper实现发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理/master选举、分布式锁和分布式队列等功能[1]。功能强大又好用。

kafka是一款基于发布订阅模型的消息队列。它是一个能够提供实时数据传输的平台,具有高吞吐、低延迟的特点。使用它的原因:一是可以处理更多的消息,不受单台服务器的限制,二是分区可作为并行处理的单元。这样使所有的请求给多台服务器处理[2]。

Kafka Connect是一种用于Kafka和其他数据系统之间进行数据传输的工具。仅关注数据的复制,不处理其他任务,对数据的传输进行管理和监控。

Debezium是Kafka Connect的一种实现,主要用于数据库和kafka之间的数据传输。它是一个CDC(Change Data Capture)系统,能实时捕获上游数据的变动。然后记录到一个或者多个Kafka topic。

solr是一款开源的企业级搜索框架,其主要功能有全文检索,分词,拼音检索等。通过solr索引,能够在短时间从海量数据里得到用户关心的数据。

1.2 系统运用到的主要技术

⑴ Spring技术。Spring是现在非常流行的一个框架。SpringMVC是Spring的一个模块,它提供完整的MVC模型解决方案[3]。本系统使用REST接口来管理同步任务,通过使用Spring和SpringMVC能够很方便、快速的搭建一个后端应用,简化开发流程[4]。

⑵ Java多线程技术。只有一个线程的系统,运行效率必定糟糕,运用到多线程技术后可以同时运行不同的同步任务,大大的提高了运行的效率[5]。

⑶ 分布式技术。本系统基于抢占式任务调度方式,保证同步任务的高可用,可以在不同机器运行。

2 系统实现

2.1 实时数据管道

实时数据管道主要数据来自mysql。当数据库信息发生变更时,mysql内变更前和变更后的内容会被发送到实时数据管道上,使各个接入方在极短时间内收到数据库变更信息。

实时数据管道由3个核心部分组成:CDC模块、系统中间体Kafka和Schema registry。

图1是一个简单的架构图,为了展示用户如何通过管道得到数据变更。Debezium实时的抓取到mysql的数据变更并使用Avro将数据序列化得到schema和值,其中schema被提交到schema registry并返回id,之后id和序列化后的值一起发送到kafka。数据管道下游逻辑由开发人员实现:初始化一个KafkaConsumer,订阅指定的Kafka Topic,根据拿到的数据schema id,从schema registry里获取到对应的schema,然后使用Avro将拉取到的数据的值和schema反序列化成一条消息交付给用户。本系统中的实时数据管道部分还增加了基于组件JMX信息的监控,用来实时观察数据管道的状况。

2.2 Solr索引和Mysql数据的同步系统SIS

SIS目的是解决Solr索引实时更新和数据库双写的问题。由服务端和客户端两部分组成。用户向SIS服务端提交任务,SIS客户端从SIS服务端监听到新任务后,启动任务。

⑴ SIS服务端的实现

SIS服务端有提交同步任务、删除同步任务、更新同步任务三个功能。

用户向服务端提交任务,服务端创建/sis/task和/taskX节点。/sis/task节点为SIS同步任务根节点,/sis/task/taskX表示一个同步任务节点,其配置信息都会保存在自己内部。

新任务提交的过程的概括如下:首先SIS服务端启动时,首先会尝试向zookeeper注册/sis/task持久节点。然后用户提交一份同步任务的配置内容到SIS服务端。SIS服务端收到任务创建请求后,向zk创建/sis/task/taskX 持久节点,其中taskX为同步任务的名称。同时,同步任务的配置信息会被保存到/sis/task/taskX节点内。Zookeeper提供的分布式协调功能对同一个节点的多个创建请求,只会有一个请求能成功,这也保证了不会有多个相同任务被创建。

⑵ SIS服务端的设计

SIS客户端的设计围绕zookeepe展开,它负责同步任务组件创建,调度器初始化等工作。这里的实现非常复杂,在这里简单的阐述客户端任务的情况。

图2是SIS客户端创建任务工作流程。其中Client表示SIS服务端,它们以集群的形式运行,每一个Client都是对等的。

⑴ 每个SIS客户端启动时,会向/sis/task节点注册监听器,监听该节点子节点变化情况。当/sis/task子节点增加,删除,内容更新时SIS客户端会收到通知。

⑵ SIS服務端根据用户创建任务请求,创建/sis/task/taskX 任务节点,其中taskX为同步任务的名称,它是一个持久节点。

⑶ 所有SIS客户端都会得到节点/sis/task/taskX被创建的消息。SIS客户端收到回调之后,都会向/sis/task/taskX节点注册监听器。Zookeeper保证只会有一个客户端请求成功,开始任务同时将同步任务的运行状态写入到lock节点内部。

⑷ 如果此时运行同步任务taskX的服务器发生宕机, 那么SIS和zookeeper的连接将会断开,并且lock临时节点将自动删除。剩余的SIS客户端由于添加了对同步任务的监听器会收到同步任务中断的通知,又开始对/sis/task/taskX任务节点加锁。加锁成功的SIS客户端,从任务节点读取配置信息,重新启动同步任务。基于这个机制实现SIS同步任务的高可用。

⑸ 如果用户主动提交删除任务请求,那么SIS服务端首先将/sis/task/taskX/lock锁节点的状态信息更新为WAIT_FOR_CLOSE,这表示该任务节点等待删除,随后删除锁节点和/sis/task/taskX任务节点,之后所有在/sis/task/taskX任务节点注册监听器的SIS客户端都会收到锁节点被用户主动删除的通知,但都不做任何响应。任务节点删除之后,所有向/sis/task节点注册监听器的SIS客户端收到任务节点被删除的通知,SIS客户端根据通知内容判断同步任务是否运行在自己所在服务器来同步任务和清理资源。

⑹ 如果用户主动提交更新同步任务配置信息请求,那么SIS服务端会更新/sis/task/taskX的节点内容。随后所有在/sis/task节点注册监听器的SIS客户端都会收到通知,并根据通知获取到具体哪个任务节点需要更新,随后更新/sis/task/taskX/lock锁节点状态为NEED_UPDATE。之后所有在/sis/task/taskX节点注册监听器的SIS客户端收都会收到锁节点内容被更新的通知,并判断对应的同步任务是否在自己所在的服务器,如果是则再次判断任务状态,如果为NEED_UPDATE,那么就停止老的同步任务,清理资源,删除同步任务下的锁节点。锁节点被删除后,和第⑷步类似,创建新的同步任务。

上述六个步骤概述了SIS客户端对任务的调度过程,基于zookeeper的SIS客户端和服务端的实现,让SIS同步任务能够高可用,即使某一台服务器宕机,同步任务也不会中断。

2.3 实时数据管道和SIS

实时数据管道可以应用于以下的场景:数值统计、实时数据分析、响应式编程。实时数据管道能够让开发人员实现实时ETL(Extract-Transform-Load),提供实时、无限的数据流。

SIS可以解决solr索引更新延时大的缺点,实现索引的实时更新。并且还能够完美解决Mysql双写需要开发人员在项目代码里添加额外代码,实现数据写入两个库的问题。

如下是数据管道和SIS相结合实现mysql双写。

现在在同一个数据库里有2张表,分别是userinfo和test表。本系统将实现:当userinfo表有内容变更时,test表能立刻同步。用户提交配置内容到同步服务器,指定userinfo表的变更需要被同步到test表,点击create创建数据库双写同步任务。

提交的配置信息如图3,指定SIS同步的数据来自数据管道userinfo相关的topic。中间有多个处理过程包括数据的冗余,转换。最后数据会被写入mysql的test表。

从图4可以看到,Userinfo表里userid为1的数据变更前timestamp字段的值为null,当行内容有更新时,timestamp值会被自动更新为内容更新的时间。

更新userinfo表userid为1的行数据,将username更新为name111,如图5所示。timestamp字段的值被更新为此行内容变更时的时间2018-05-15 23:37:03。因为之前创建了mysql双写同步任务,所以userinfo的变更内容,会被同步到test表。test表的username值变更为name111,并且timestamp为test表userid为1的行变更时的时间 2018-05-14 23:37:04。和userinfo的timestamp值2018-05-15 23:37:03相比,同步userinfo变更内容到test表,只花费了1秒。

2.4 对比

图6⑴是使用本系统后的效果图。highwater表示每个时间点mysql总共有多少变更数据,offset表示当前消费的数据量。从中可以看到,2条线是重合的,也就是说在每个时间点的mysql变更,本系统都能够实时的处理消费。由于监控的原因,offset可能会高于highwater,offset高于highwater表示SIS消费是完全跟上了mysql变更。

图6⑵是不使用本系统而是使用增量的方式处理消息的延时图。可以看到下面的线offset总是经过一段时间后才上涨,而不能做到实时的和highwater保持一致。通过对比,使用了本系统后,能夠做到数据的实时处理。

3 总结

本文主要研究了一个通用的,能够服务于不同系统的数据同步系统。通过本系统,开发人员只需要编写一份简单的描述文件,说明要同步的数据从哪来、到哪去,比如指定需要同步的数据是哪个mysql的表,这些数据会被同步到solr还是mysql的另一个新表等。系统根据配置内容就能自动同步。有了此系统能够让开发人员专注于业务开发而不需要花费大量的精力在业务之外的代码编写上,提高了开发效率。

参考文献(References):

[1] 倪超.从Paxos到Zookeeper:分布式一致性原理与实践[M].北京:机械工业出版社,2015.

[2] 牟大恩.Kafka入门与实践[M].人民邮电出版社,2017:59-89

[3] Craig Walls.Spring实战(第4版) [M].人民邮电出版社,2016:187-205

[4] 明日科技.Java Web从入门到精通 [M].清华大学出版社,2012:78-89

[5] 葛一鸣,郭超.实战Java高并发程序设计[M].电子工业出版社,2015:100-110

[6] 鸟哥.鸟哥的Linux私房菜[M].人民邮电出版社,2010:120-150

[7] 克雷格·沃斯.Spring Boot实战[M].人民邮电出版社,2016:93-134

[8] Bruce Eckel.Java编程思想(第4版)[M].机械工业出版社,2007:135-150

[9] 疯狂软件.Spring+MyBatis企业应用实战[M].电子工业出版社,2017:87-102

[10] Raoul-Gabriel Urma, Mario Fusco, Alan Mycroft.Java 8 in Action[M].USA:Manning,2014:153-160

猜你喜欢

分布式
基于预处理MUSIC算法的分布式阵列DOA估计
西门子 分布式I/O Simatic ET 200AL
家庭分布式储能的发展前景