本文所有和 kafka 相关操作都基于 Direct 消费模式

在 Spark Streaming 作业中,每个作业会消费一个或多个 topic,但是这些 topic 需要在作业启动之前确定好,在作业运行中不能进行调整,之前修改了源码做到了自适应 topic partition 扩容的情况,但是无法动态调整消费的 topic。现在需要在不重启作业的情况下,动态调整消费的 topic。

方案

回顾之前自适应 partition 调整的方案,落到源码层面最终以 partition 为最小消费单元,而不是 topic。因此动态的调整消费的 topic 在理论上就是可行的 – 假设作业已经消费的 topic 为 A,在自适应 partition 扩容的时候,我们是增加了 A 的某些 partition,那么我们同样可以增加 B topic 的 partition,其中 B topic 是作业之前没有消费的。

动态调整 partition 的方案中,只需要将现在消费的 parition 数不断的对齐现在 kafka 上相应 topic 的 partition 数目即可。动态调整作业消费的 topic 则需要有一个地方存储作业消费的 topic 数目,然后将这个信息周期性的同步给作业即可 – 可以理解前者使用 kafka 作为存储介质,保存了 topic 的 partition 数目。

本方案中,选择 zookeeper 作业作为存储 topic 的介质。希望动态调整 topic 的时候,修改 zookeeper 中对应路径下的节点即可。然后作业定时的访问 zookeeper 的特定路径同步需要消费的 topic 数目即可。示意图如下

实现

方案确定了,直接修改下上次自适应 partition 扩容的代码即可 – 本方案只实现了增加 topic 的功能,当前消费的 topic 不会被删除,如果需要的话可以自行修改源码满足这一点。

DirectKafkaInputStreamcompute 函数开始处添加如下逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val topic = currentOffsets.head._1.topic
var addedTopic : Set[String] = Set()
val topics = getTopicsForJob()
for (i <- topics) {
if (!i.equals(topic)) {
addedTopic = addedTopic + i
}
}
if (addedTopic.nonEmpty) {
val topicLeaders = MTKafkaUtils.getPartitions(kafkaParams, addedTopic)
val largestOffset = MTKafkaUtils.getLeaderOffsets(kafkaParams, topicLeaders, OffsetRequest.LatestTime)
currentOffsets = currentOffsets ++ largestOffset
}

然后增加一个获取所有 topic 的函数,下面 Constants 包中使用了一些常量,自行替换即可

1
2
3
4
5
6
7
8
9
10
11
private def getTopicsForJob() : List[String] = {
val jobName = SparkEnv.get.conf.get(Constants.JOB_PREFIXED_NAME_KEY) //这个是提交 job 时添加的一个参数,用于区分每个作业,会当作 zk 中路径的一级
val zkHostPort: String = "xxxxxxxxx"
val zkClient = new ZkClient(zkHostPort, Constants.DEFAULT_SESSION_TIMEOUT, Constants.DEFAULT_CONNECTION_TIMEOUT, new ZkOffsetSerializer) //ZkOffsetSerializer 自己实现了一个简单的序列化,反序列化类,就用了 String.getBytes 和 new String()
if (!zkClient.exists(s"${Constants.ROOT_PATH_OF_MULTI_TOPIC_PER_JOB}/${jobName}")) {
ZkUtils.updatePersistentPath(zkClient, s"${Constants.ROOT_PATH_OF_MULTI_TOPIC_PER_JOB}/${jobName}", s"${jobName}");
}
zkClient.getChildren(s"${Constants.ROOT_PATH_OF_MULTI_TOPIC_PER_JOB}/${jobName}").asScala.toList
}

Comments