0

我使用 Spark 作为批处理来处理来自 kafka 的日志。在每个周期中,我的代码应该得到任何到达 kafka 消费者的东西。但是,我想限制每个周期从 kafka 获取的数据量。假设 5 GB 或 500000 条日志行..

offsetRanges = []
def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    WRITE OFFSETS TO DISK
    return rdd

while True:
    host = "localhost:9092"
    offset = OffsetRange(topic, 0, fromOffset, untilOffset)
    kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
    kafka_content.transform(storeOffsetRanges)
    RDD TRANSFORMATIONS..

如果驱动程序失败,我会将偏移量存储在内存和磁盘中。但是我怎样才能强加这些 kafka 偏移量来限制每个周期的最大数据量呢?卡夫卡偏移范围的单位是什么?

提前致谢!

4

1 回答 1

0

Kafka 偏移量单位是消息。在每个周期中,您最多会收到untilOffest - fromOffset来自 Kafka 的消息。但是数据只会从一个主题分区中读取,因此如果您的主题有更多分区,那么应用程序将丢失一些日志行。

作为替代方案,您可以尝试使用 kafka 直接方法进行火花流式传输。使用这种方法,您将摆脱while True,您将使用可选的背压机制基于时间(不是固定的偏移量)在微批处理中处理日志行。然后您可以省略在内存中保存偏移量(流式处理将处理它),但在驱动程序重新启动的情况下仍然需要将它们保存到磁盘(参见fromOffsetsKafkaUtils.createDirectStream

于 2017-01-27T18:32:26.913 回答