我使用 Spark 作为批处理来处理来自 kafka 的日志。在每个周期中,我的代码应该得到任何到达 kafka 消费者的东西。但是,我想限制每个周期从 kafka 获取的数据量。假设 5 GB 或 500000 条日志行..
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
WRITE OFFSETS TO DISK
return rdd
while True:
host = "localhost:9092"
offset = OffsetRange(topic, 0, fromOffset, untilOffset)
kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
kafka_content.transform(storeOffsetRanges)
RDD TRANSFORMATIONS..
如果驱动程序失败,我会将偏移量存储在内存和磁盘中。但是我怎样才能强加这些 kafka 偏移量来限制每个周期的最大数据量呢?卡夫卡偏移范围的单位是什么?
提前致谢!