需求
从指定时间戳(比如 2 小时)开始消费 Kafka 数据
思路
我们知道通过 Kafka 的 API 可以得到指定时间戳对应数据所在的 segment 的起始 offset。那么就可以通过这个功能来粗略的实现需求。
实现
我们知道 KafkaUitls.createDirectStream
这个接口可以指定起始点的 offset,那么我们需要做的就变成如下三步:
- 获取
topic
对应的 TopicAndPartitions
,得到当前 topic 有多少 partition
- 从 Kafka 获取每个 partition 指定时间戳所在 segment 的起始 offset
- 将步骤 2 中的 offset 作为参数传入
createDirectStream
即可
通过查看源码,我们知道步骤 1 和步骤 2 中的功能在 org.apache.spark.streaming.kafka.KafkaCluster
中都已经有现成的函数了:getPartitions
和 getLeaderOffsets
,分别表示获取指定 topic 的 partition 以及获取 partition 指定时间戳所在的 segment 的起始 offset,那么我们需要做的就是如何调用这两个函数实现我们的功能了。
我们知道 KafkaCluster
的作用域是 private[spark]
所以我们需要在自己的代码中使用 package org.apache.spark(.xxx ... .yyy)
(小括号中表示可以省略)来限定自己的代码,因此我们可以将步骤 1 和步骤 2 中的功能实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package org.apache.spark.streaming.kafka ...... //省略其他不相关的代码 def getPartitions(kafkaParams: Map[String, String], topics: Set[String]): Either[Err, Set[TopicAndPartition]] = { val kc = new KafkaCluster(kafkaParams) kc.getPartitions(topics) //我们可以在这里处理错误,也可以将错误继续往上传递 } def getLeaderOffsets(kafkaParams: Map[String, String], topicAndPartitions: Set[TopicAndPartition], before: Long) : Map[TopicAndPartition, Long] = { val kc = new KafkaCluster(kafkaParams) val leaderOffsets = kc.getLeaderOffsets(topicAndPartitions, before) if (leaderOffsets.isLeft) { //在本函数内部处理错误,如果有错误抛出异常 throw new RuntimeException(s"### Exception when MTKafkaUtils#getLeaderOffsets ${leaderOffsets.left.get} ###") } leaderOffsets.right.get.map { case (k, v) => (k, v.offset)} //将 Map[TopicAndPartition, LeaderOffset] 转变为 Map[TopicAndPartition, Long](Long 为对应 partition 的 offset,从 LeaderOffset 中获取) }
|
步骤 3 直接传入参数即可,就可以从指定时间戳开始消费 Kafka 数据了