现象
Spark Streaming 处理数据过程中遇到 Ran out of messages before reaching ending offset
异常,导致程序一直 hang 住(因为我们希望接上次消费从而不丢失数据)
分析
通过异常栈信息,我们知道异常从 KafkaRDD.scala#211 行抛出,下面是相应代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 206 override def getNext(): R = { if (iter == null || !iter.hasNext) { 208 iter = fetchBatch } 210 if (!iter.hasNext) { 211 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) 212 finished = true null.asInstanceOf[R] } else { val item = iter.next() if (item.offset >= part.untilOffset) { 217 assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part)) finished = true null.asInstanceOf[R] } else { requestOffset = item.nextOffset messageHandler(new MessageAndMetadata( part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } } 226 }
|
通过分析,我们知道这个地方是实际从 Kafka 读取数据的逻辑,首先会调用 fetchBatch
函数(208 行),然后进行逻辑判断,数据是否读取完毕,是否发生异常
其中 211 行的异常表示还未读取到 part.untilOffset 但是当前迭代器中没有数据了;217 行表示当前读取的数据如果超过了 part.untilOffset ,那么在这个时候退出当前 batch(offset 从 fromOffset 逐次加一增加的,正常的逻辑肯定会和 part.untilOffset 相等)
我们知道异常从 211 行抛出来的,也知道了异常的最直接原因,那么这个原因是什么造成的呢?
211 行的代码执行了,也就是 210 行的 if 语句未 true,这样的话,207 行的逻辑也应该为 true。这样的话 iter 就是 fetchBatch 返回的迭代器了。接下来我们看看 fetchBatch 的代码
1 2 3 4 5 6 7 8 9 10 11
| 188 private def fetchBatch: Iterator[MessageAndOffset] = { 189 val req = new FetchRequestBuilder() 190 .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) .build() 192 val resp = consumer.fetch(req) handleFetchErr(resp) // kafka may return a batch that starts before the requested offset resp.messageSet(part.topic, part.partition) 196 .iterator .dropWhile(_.offset < requestOffset) }
|
我们发现 192 行会通过 consumer 从 kafka 获取数据,本次从哪获取数据,以及获取多少分别由 190 行的 topic
, partition
和 kc.config.fetchMessageMaxBytes
指定。我们查看 kc.config.fetchMessageMaxBytes
,发现默认使用的是 1M
1 2 3 4
| ConsumerConfig.scala 29 val FetchSize = 1024 * 1024 114 val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
|
从这里我们知道每次从 kafka 上最多获取 1M 的数据(这也是为什么需要在 KafkaRDD.getNext
函数的开头通过 iter.hasNext()
来判断是否需要调用 fetchBatch
然后看到 fetchBatch 函数对应的 196 行,获取迭代器作为返回值,查看相应代码,跳转到 ByteBufferMessageSet.scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 139 override def iterator: Iterator[MessageAndOffset] = internalIterator() 145 private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { ...... 152 def makeNextOuter: MessageAndOffset = { // if there isn't at least an offset and size, we are done if (topIter.remaining < 12) return allDone() val offset = topIter.getLong() val size = topIter.getInt() if(size < Message.MinHeaderSize) throw new InvalidMessageException("Message found with corrupt size (" + size + ")") 160 // we have an incomplete message 161 if(topIter.remaining < size) 162 return allDone() .... 185 }
|
从 161 行我们可以看出,如果读取的消息是一条不完整的,那么本次不处理,默认本次消息读取完成。
上面所有的链条穿起来就抛出了我们文章开始的异常。
1. 从 kafka 读取 1M 的数据(默认大小)
2. 发现读取的数据不完整(这个消息的大小大于 1M),所以本次读取的 迭代器 为空
3. 发现迭代器为空,但是当前的 offset 和 part.untilOffset 不想等,抛出异常
解决方案
通过设置 kafkaParam 的参数 fetch.message.max.bytes
就行了,我们设置成 2M(大于一条数据的最大值即可),就能够运行成功了