Golang语言实现的流水线模型
2020-06-11王晓峰
文/王晓峰
(中国电信股份有限公司海南分公司 海南省海口市 570105)
众所周知,阿里双11 的实时交易额使用了Flink 技术,高峰时每秒处理交易数据4.7 亿个。Flink 这么高的处理能力是得益于底层采用了流水线模型,流水线(pipeline)模型是对现实世界的流水线作业方式的完美模仿。本文就由浅入深的用Golang 语言实现一个简单地高并发流水线。Golang 语言实现的流水线(pipeline)模型,可以允许开发者以优雅的方式构建出类似Linux Pipe 的高效利用I/O 和多核CPU 优势的数据流水线(data pipeline)。
现实中流水线就是一个从传送带取出物品,进行处理后,再放到下一个传送带三个步骤。而在Golang 语言里,传送带就是channel,中间每个环节就是从channel里取数据,然后进行数据操作,再把操作结果放入下一个channel 的三个步骤。因此中间每个操作的输入是一个channel,输出也是一个channel。
流水线的最开始的环节只是把需要处理的数据放入channel,因此没有输入,通常称最开始的环节叫做“生产者”;最后一个环节是收集处理完的数据,因此没有输出,通常把最后一个环节叫做“消费者”。
从而流水线可看做,生产者把数据送入通道(channel),经过一系列处理,消费者从通道(channel)得到处理后的数据。
1 用Golang语言实现一个链式操作的流水线
1.1 简单流水线需求
假设需要对一组整数进行处理:要求对每个数进行平方操作后,再对每个数加10,最后打印出处理后的数据。
1.2 简单流水线设计思路
根据前面描述的需求,我们分析首先生产者是产生一组数据,消费者是最后对数据的打印,中间处理环节是对每个数进行平方操作,和对每个数加10。每个环节都是把上一环节的输出通道当做本环节的输入(为了保证通道不会阻塞,必须给通道设置一个合适大小的缓存),因此这里需要分四步进行。
(1)生产者输出数据到通道,当输出完毕后,关闭输出通道。
(2)从生产者输出通道上取数据,进行平方操作后,把结果放入通道,把通道输出,同样输出完毕后,关闭输出通道。
(3)从上一环节输出的通道取数据,进行加10 操作后,把结果放入通道,把通道输出,同样输出完毕后,关闭输出通道。
(4)最后消费者从上一环节输出的通道中,取出数据进行打印。
1.3 链式调用优化
为了使得代码的可读性、可维护性和可变更性变强,进行链式优化。我们只需把所有函数封装成通用通道类型下的方法,再实例化一个通用通道对象,即可把流水线模型的主程序调用转换为对象方式的链式(也称流式)调用。
1.4 优化成带扇入、扇出并行的流水线
在现实世界里,流水线的中间一些环节可能需要并行操作,比如以汽车生产流水线为例,在安装轮子这个环节时,是同时进行4个轮子的安装,在四个轮子都安装好后,再进入下一工作环节。这个过程就存在了任务的分发、和并行任务结果的汇聚。在Golang语言的流水线里,就称作扇入(FAN-IN)、扇出(FAN-OUT)。
扇入(FAN-IN),就是把多个通道上的数据汇集到一条通道上,是一种收敛的模式,主要用来收集处理的结果。
图1
图2
扇出(FAN-OUT),多个多个goroutines 共享一个通道,简单讲就是多个方法(或函数)从一个通道上读取数据,一般用于分发数据到多个协程并发处理。如图1 所示。
为了并发执行,我们可在第二个环节,计算平方方法实现3 个goroutine、第三个环节,加10 运算方法,实现2 个goroutine。因此在第二和第三环节要进行扇入和扇出操作。第一和第四环节保持不变。
1.5 并行的流水线设计思路
第一和第四环节保持不变。第二和第三个环节,需要进行扇出处理。把第一环节输出的通道扇出到多个平方计算的协程上,每个协程把结果输出到各自的输出通道,然后再把三个通道扇入到一个通道作为第二环节的输出通道。
同样第三环节把第二环节输出的通道扇出到多个个加法协程上,每个协程把结果输出到各自的输出通道,然后再把二个通道扇入到一个通道作为第三环节的输出通道。如图2 所示。
1.6 带扇入、扇出的的流水线代码
保持原先定义的通用的通道类型,chan int 的通用类型MyChan type MyChan chan int
在第二和第三环节进行扇入、扇出修改,修改后的代码如下:
(1)第一环节保持不变,生产者输出数据到通道。
(2)第二环节需要进行扇入扇出,设置需要并发计算平方的数为threadNum,先进性扇出操作:从生产者输出通道上取数据,进行threadNum 个协程平方操作后,把结果分别放入通道。然后进行扇入操作:把多个输出通道扇入到一个通道(out),并返回这个通道out。
(3)第三环节仍然需要进行扇入扇出,先进性扇出操作:定义并发函数addten(threadNum int),从上一环节按照threadNum数量扇出到addten(threadNum int),进行加10 操作。然后再进行扇入操作:把threadNum 个输出通道扇入到out 通道。最后把输出的通道作为返回值返回。
(4)最后消费者从上一环节输出的通道中,取出数据进行打印,同样把上一环节的通道作为函数的参数,但不用再返回通道。
2 结论
Golang语言的流水线(Pipeline)模型是对现实世界的完美模仿,流水线模型逻辑清晰,易于实现,各环节并发可调节,适合大量数据的处理工作,流水线模型结合链式调用模式是代码可读性大大提高,有利于维护和修改。Golang 的流水线结合Contex 可实现对流水线的控制,当不需要生产者发送时数据时,简单的一个cancel()就能终止数据发送,并关闭所有通道,关于Contex 又是一个很大的话题,本文就不再论述了。
流水线涉及到大量并发和通道的操作,一旦出现错误处理就比较复杂,因此建议力求把流水线的每个环节设计完善,做好测试,尽量确保每个环节逻辑清晰。当上一环节发送操作结束时,每个环节都应该及时关闭自己的输出通道。同时也尽量提供足够大的缓冲保存发送者发送的数据。