本文共 4332 字,大约阅读时间需要 14 分钟。
作者:张馨予
本文从数据传输和数据可靠性的角度出发,对比测试了 Storm 与 Flink 在流处理上的性能,并对测试结果进行分析,给出在使用 Flink 时提高性能的建议。
Apache Storm、Apache Spark 和 Apache Flink 都是开源社区中非常活跃的分布式计算平台,在很多公司可能同时使用着其中两种甚至三种。对于实时计算来说,Storm 与 Flink 的底层计算引擎是基于流的,本质上是一条一条的数据进行处理,且处理的模式是流水线模式,即所有的处理进程同时存在,数据在这些进程之间流动处理。而 Spark 是基于批量数据的处理,即一小批一小批的数据进行处理,且处理的逻辑在一批数据准备好之后才会进行计算。在本文中,我们把同样基于流处理的 Storm 和 Flink 拿来做对比测试分析。
在我们做测试之前,调研了一些已有的大数据平台性能测试报告,比如,雅虎的 Streaming-benchmarks,或者 Intel 的 HiBench 等等。除此之外,还有很多的论文也从不同的角度对分布式计算平台进行了测试。虽然这些测试 case 各有不同的侧重点,但他们都用到了同样的两个指标,即吞吐和延迟。吞吐表示单位时间内所能处理的数据量,是可以通过增大并发来提高的。延迟代表处理一条数据所需要的时间,与吞吐量成反比关系。
在我们设计计算逻辑时,首先考虑一下流处理的计算模型。上图是一个简单的流计算模型,在 Source 中将数据取出,发往下游 Task ,并在 Task 中进行处理,最后输出。对于这样的一个计算模型,延迟时间由三部分组成:数据传输时间、 Task 计算时间和数据排队时间。我们假设资源足够,数据不用排队。则延迟时间就只由数据传输时间和 Task 计算时间组成。而在 Task 中处理所需要的时间与用户的逻辑息息相关,所以对于一个计算平台来说,数据传输的时间才更能反映这个计算平台的能力。因此,我们在设计测试 Case 时,为了更好的体现出数据传输的能力,Task 中没有设计任何计算逻辑。
在确定数据源时,我们主要考虑是在进程中直接生成数据,这种方法在很多之前的测试标准中也同样有使用。这样做是因为数据的产生不会受到外界数据源系统的性能限制。但由于在我们公司内部大部分的实时计算数据都来源于 kafka ,所以我们增加了从 kafka 中读取数据的测试。
对于数据传输方式,可以分为两种:进程间的数据传输和进程内的数据传输。
进程间的数据传输是指这条数据会经过序列化、网络传输和反序列化三个步骤。在 Flink 中,2个处理逻辑分布在不同的 TaskManager 上,这两个处理逻辑之间的数据传输就可以叫做进程间的数据传输。Flink 网络传输是采用的 Netty 技术。在 Storm 中,进程间的数据传输是 worker 之间的数据传输。早版本的 storm 网络传输使用的 ZeroMQ,现在也改成了 Netty。
进程内的数据传输是指两个处理逻辑在同一个进程中。在 Flink 中,这两个处理逻辑被 Chain 在了一起,在一个线程中通过方法调用传参的形式进程数据传输。在 Storm 中,两个处理逻辑变成了两个线程,通过一个共享的队列进行数据传输。
都有各自的可靠性机制。在 Storm 中,使用 ACK 机制来保证数据的可靠性。而在 Flink 中是通过 checkpoint 机制来保证的,这是来源于 chandy-lamport 算法。
事实上 Exactly-once 可靠性的保证跟处理的逻辑和结果输出的设计有关。比如结果要输出到kafka中,而输出到kafka的数据无法回滚,这就无法保证 Exactly-once。我们在测试的时候选用的 at-least-once 语义的可靠性和不保证可靠性两种策略进行测试。
上图是我们测试的环境和各个平台的版本。
上图展示的是 在自产数据的情况下,不同的传输方式和可靠性的吞吐量:在进程内+不可靠、进程内+可靠、进程间+不可靠、进程间+可靠。可以看到进程内的数据传输是进程间的数据传输的3.8倍。是否开启 checkpoint 机制对 Flink 的吞吐影响并不大。因此我们在使用 Flink 时,进来使用进程内的传输,也就是尽可能的让算子可以 Chain 起来。
那么我们来看一下为什么 Chain 起来的性能好这么多,要如何在写 Flink 代码的过程中让 Flink 的算子 Chain 起来使用进程间的数据传输。
大家知道我们在 Flink 代码时一定会创建一个 env,调用 env 的 disableOperatorChainning() 方法会使得所有的算子都无法 chain 起来。我们一般是在 debug 的时候回调用这个方法,方便调试问题。
如果允许 Chain 的情况下,上图中 Source 和 mapFunction 就会 Chain 起来,放在一个 Task 中计算。反之,如果不允许 Chain,则会放到两个 Task 中。
对于没有 Chain 起来的两个算子,他们被放到了不同的两个 Task 中,那么他们之间的数据传输是这样的:SourceFunction 取到数据序列化后放入内存,然后通过网络传输给 MapFunction 所在的进程,该进程将数据方序列化后使用。
对于 Chain 起来的两个算子,他们被放到同一个Task中,那么这两个算子之间的数据传输则是:SourceFunction 取到数据后,进行一次深拷贝,然后 MapFunction 把深拷贝出来的这个对象作为输入数据。
虽然 Flink 在序列化上做了很多优化,跟不用序列化和不用网络传输的进程内数据传输对比,性能还是差很多。所以我们尽可能的把算子 Chain 起来。
不是任何两个算子都可以 Chain 起来的,要把算子 Chain 起来有很多条件:第一,下游算子只能接受一种上游数据流,比如Map接受的流不能是一条 union 后的流;其次上下游的并发数一定要一样;第二,算子要使用同一个资源 Group,默认是一致的,都是 default;第三,就是之前说的 env 中不能调用 disableOperatorChainning() 方法,最后,上游发送数据的方法是 Forward 的,比如,开发时没有调用 rebalance() 方法,没有 keyby(),没有 boardcast 等。
对比一下自产数据时,使用进程内通信,且不保证数据可靠性的情况下,。在这种情况下,Flink 的性能是 Storm 的15倍。Flink 吞吐能达到2060万条/s。不仅如此,如果在开发时调用了env.getConfig().enableObjectReuse() 方法,Flink 的但并发吞吐能达到4090万条/s。
当调用了 enableObjectReuse 方法后,Flink 会把中间深拷贝的步骤都省略掉,SourceFunction 产生的数据直接作为 MapFunction 的输入。但需要特别注意的是,这个方法不能随便调用,必须要确保下游 Function 只有一种,或者下游的 Function 均不会改变对象内部的值。否则可能会有线程安全的问题。
当对比在不同可靠性策略的情况下,Flink 与 Storm 的表现时,我们发现,保证可靠性对 Flink 的影响非常小,但对 Storm 的影响非常大。总的来说,在保证可靠的情况下,Flink 单并发的吞吐是 Storm 的15倍,而不保证可靠的情况下,Flink 的性能是 Storm 的66倍。会产生这样的结果,主要是因为 Flink 与 Storm 保证数据可靠性的机制不同。
而 Storm 的 ACK 机制为了保证数据的可靠性,开销更大。
左边的图展示的是 Storm 的 ACK 机制。Spout 每发送一条数据到 Bolt,就会产生一条 ACK 的信息给 ACKer ,当 Bolt 处理完这条数据后也会发送 ACK 信息给 ACKer。当 ACKer 收到这条数据的所有 ACK 信息时,会回复 Spout 一条 ACK 信息。也就是说,对于一个只有两级(spout+bolt)的拓扑来说,每发送一条数据,就会传输3条 ACK 信息。这3条 ACK 信息则是为了保证可靠性所需要的开销。
右边的图展示的是 Flink 的 Checkpoint 机制。Flink 中 Checkpoint 信息的发起者是 JobManager。它不像 Storm 中那样,每条信息都会有 ACK 信息的开销,而且按时间来计算花销。用户可以设置做 checkpoint 的频率,比如10秒钟做一次 checkpoint。每做一次 checkpoint,花销只有从 Source 发往 map 的1条 checkpoint 信息(JobManager 发出来的 checkpoint 信息走的是控制流,与数据流无关)。与 Storm 相比,Flink 的可靠性机制开销要低得多。这也就是为什么保证可靠性对 Flink 的性能影响较小,而 Storm 的影响确很大的原因。
最后一组自产数据的测试结果对比是 Flink 与 Storm 在进程间的数据传输的对比,可以看到进程间数据传输的情况下,Flink 但并发吞吐是 Storm 的4.7倍。保证可靠性的情况下,是 Storm 的14倍。
上图展示的是消费 kafka 中数据时,Storm 与 Flink 的但并发吞吐情况。因为消费的是 kafka 中的数据,所以吞吐量肯定会收到 kafka 的影响。我们发现性能的瓶颈是在 SourceFunction 上,于是增加了 topic 的 partition 数和 SourceFunction 取数据线程的并发数,但是 MapFunction 的并发数仍然是1.在这种情况下,我们发现 Flink 的瓶颈转移到上游往下游发数据的地方。而 Storm 的瓶颈确是在下游收数据反序列化的地方。
之前的性能分析使我们基于数据传输和数据可靠性的角度出发,单纯的对 计算平台本身进行了性能分析。但实际使用时,task 是肯定有计算逻辑的,这就势必更多的涉及到 CPU,内存等资源问题。我们将来打算做一个智能分析平台,对用户的作业进行性能分析。通过收集到的指标信息,分析出作业的瓶颈在哪,并给出优化建议。
转载于:https://blog.51cto.com/14286418/2408359