我一直在做通过卡夫卡消费和生产数据的火花流工作。我用的是directDstream,所以我必须自己管理offset,我们用redis来读写offset。现在有个问题,当我启动我的客户端时,我的客户端需要从redis中获取offset,而不是kafka中存在的offset本身。如何显示我编写我的代码?现在我已经在下面编写了我的代码:
kafka_stream = KafkaUtils.createDirectStream(
ssc,
topics=[config.CONSUME_TOPIC, ],
kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
"auto.offset.reset": "largest"},
fromOffsets=read_offset_range(config.OFFSET_KEY))
但我认为 fromOffsets 是火花流客户端启动时的值(来自 redis),而不是在其运行期间。谢谢您的帮助。