现象

Spark Streaming 处理数据过程中遇到 Ran out of messages before reaching ending offset 异常,导致程序一直 hang 住(因为我们希望接上次消费从而不丢失数据)

分析

通过异常栈信息,我们知道异常从 KafkaRDD.scala#211 行抛出,下面是相应代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
206 override def getNext(): R = {
if (iter == null || !iter.hasNext) {
208 iter = fetchBatch
}
210 if (!iter.hasNext) {
211 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
212 finished = true
null.asInstanceOf[R]
} else {
val item = iter.next()
if (item.offset >= part.untilOffset) {
217 assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else {
requestOffset = item.nextOffset
messageHandler(new MessageAndMetadata(
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
}
}
226 }

通过分析,我们知道这个地方是实际从 Kafka 读取数据的逻辑,首先会调用 fetchBatch 函数(208 行),然后进行逻辑判断,数据是否读取完毕,是否发生异常

其中 211 行的异常表示还未读取到 part.untilOffset 但是当前迭代器中没有数据了;217 行表示当前读取的数据如果超过了 part.untilOffset ,那么在这个时候退出当前 batch(offset 从 fromOffset 逐次加一增加的,正常的逻辑肯定会和 part.untilOffset 相等)

我们知道异常从 211 行抛出来的,也知道了异常的最直接原因,那么这个原因是什么造成的呢?

211 行的代码执行了,也就是 210 行的 if 语句未 true,这样的话,207 行的逻辑也应该为 true。这样的话 iter 就是 fetchBatch 返回的迭代器了。接下来我们看看 fetchBatch 的代码

1
2
3
4
5
6
7
8
9
10
11
188 private def fetchBatch: Iterator[MessageAndOffset] = {
189 val req = new FetchRequestBuilder()
190 .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
.build()
192 val resp = consumer.fetch(req)
handleFetchErr(resp)
// kafka may return a batch that starts before the requested offset
resp.messageSet(part.topic, part.partition)
196 .iterator
.dropWhile(_.offset < requestOffset)
}

我们发现 192 行会通过 consumer 从 kafka 获取数据,本次从哪获取数据,以及获取多少分别由 190 行的 topic, partitionkc.config.fetchMessageMaxBytes 指定。我们查看 kc.config.fetchMessageMaxBytes,发现默认使用的是 1M

1
2
3
4
ConsumerConfig.scala
29 val FetchSize = 1024 * 1024
114 val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)

从这里我们知道每次从 kafka 上最多获取 1M 的数据(这也是为什么需要在 KafkaRDD.getNext 函数的开头通过 iter.hasNext() 来判断是否需要调用 fetchBatch

然后看到 fetchBatch 函数对应的 196 行,获取迭代器作为返回值,查看相应代码,跳转到 ByteBufferMessageSet.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
139 override def iterator: Iterator[MessageAndOffset] = internalIterator()
145 private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
......
152 def makeNextOuter: MessageAndOffset = {
// if there isn't at least an offset and size, we are done
if (topIter.remaining < 12)
return allDone()
val offset = topIter.getLong()
val size = topIter.getInt()
if(size < Message.MinHeaderSize)
throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
160 // we have an incomplete message
161 if(topIter.remaining < size)
162 return allDone()
....
185 }

从 161 行我们可以看出,如果读取的消息是一条不完整的,那么本次不处理,默认本次消息读取完成。
上面所有的链条穿起来就抛出了我们文章开始的异常。

1. 从 kafka 读取 1M 的数据(默认大小)
2. 发现读取的数据不完整(这个消息的大小大于 1M),所以本次读取的 迭代器 为空
3. 发现迭代器为空,但是当前的 offset 和 part.untilOffset 不想等,抛出异常

解决方案

通过设置 kafkaParam 的参数 fetch.message.max.bytes 就行了,我们设置成 2M(大于一条数据的最大值即可),就能够运行成功了

Comments