Spark Streaming 自适应上游 kafka topic partition 数目变化
背景
Spark Streaming 作业在运行过程中,上游 topic 增加 partition 数目从 A 增加到 B,会造成作业丢失数据,因为该作业只从 topic 中读取了原来的 A 个 partition 的数据,新增的 B-A 个 partition 的数据会被忽略掉。
思考过程
为了作业能够长时间的运行,一开始遇到这种情况的时候,想到两种方案:
- 感知上游 topic 的 partition 数目变化,然后发送报警,让用户重启
- 直接在作业内部自适应上游 topic partition 的变化,完全不影响作业
方案 1 是简单直接,第一反应的结果,但是效果不好,需要用户人工介入,而且需要删除 checkpoint 文件
方案 2 从根本上解决问题,用户不需要关心上游 partition 数目的变化,但是第一眼会觉得较难实现。
方案 1 很快被 pass 掉,因为人工介入的成本太高,而且实现起来很别扭。接下来考虑方案 2.
Spark Streaming 程序中使用 Kafka 的最原始方式为 KafkaUtils.createDirectStream
通过源码,我们找到调用链条大致是这样的
KafkaUtils.createDirectStream
–> new DirectKafkaInputDStream
–> 最终由 DirectKafkaInputDStream#compute(validTime : Time)
函数来生成 KafkaRDD。
而 KafkaRDD 的 partition 数和 作业开始运行时 topic 的 partition 数一致,topic 的 partition 数保存在 currentOffsets 变量中,currentOffsets 是一个 Map[TopicAndPartition, Long]类型的变量,保存每个 partition 当前消费的 offset 值,但是作业运行过程中 currentOffsets 不会增加 key,就是说不会增加 KafkaRDD 的 partition,这样导致每次生成 KafkaRDD 的时候都使用 作业开始运行时 topic 的 partition 数作为 KafkaRDD 的 partition 数,从而会造成数据的丢失。
解决方案
我们只需要在每次生成 KafkaRDD 的时候,将 currentOffsets 修正为正常的值(往里面增加对应的 partition 数,总共 B-A 个,以及每个增加的 partition 的当前 offset 从零开始)。
- 第一个问题出现了,我们不能修改 Spark 的源代码,重新进行编译,因为这不是我们自己维护的。想到的一种方案是继承 DirectKafkaInputDStream。我们发现不能继承 DirectKafkaInputDStream 该类,因为这个类是使用
private[streaming]
修饰的。 - 第二个问题出现了,怎么才能够继承 DirectKafkaInputDStream,这时我们只需要将希望继承 DirectKafkaInputDStream 的类放到一个单独的文件 F 中,文件 F 使用
package org.apache.spark.streaming
进行修饰即可,这样可以绕过不能继承 DirectKafkaInputDStream 的问题。这个问题解决后,我们还需要修改Object KafkaUtils
,让该 Object 内部调用我们修改后的 DirectKafkaInputDStream(我命名为 MTDirectKafkaInputDStream) - 第三个问题如何让 Spark 调用 MTDirectKafkaInputDStream,而不是 DirectKafkaInputDStream,这里我们使用简单粗暴的方式,将 KafkaUtils 的代码 copy 一份,然后将其中调用 DirectKafkaInputDStream 的部分都修改为 MTDirectKafkaInputDStream,这样就实现了我们的需要。当然该文件也需要使用
package org.apache.spark.streaming
进行修饰 - 第二个和第三个问题的解决方案在 中国 Spark 技术峰会 2016 上,广点通的 林立伟 有提及,后续会进行尝试
总结下,我们需要做两件事
- 修改 DirectKafkaInputDStream#compute 使得能够自适应 topic 的 partition 变更
- 修改 KafkaUtils,使得我们能够调用修改过后的 DirectKafkaInputDStream
代码
|
|
在修改过后的 KafkaUtils 文件中,将所有的 DirectKafkaInputDStream
都替换为 MTDirectKafkaInputDStream
即可