0

我可能错过了卡夫卡消费者的观点,但我想做的是:

消费者订阅一个主题,获取该主题内的所有消息并返回一个包含所有这些消息列表的 Future

我为尝试完成此操作而编写的代码是

val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}

def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
  .map { message =>
    logger.info(s"Consuming ${message.record.value}")
    KafkaMessage(Some(message.record.key()), Some(message.record.value()))
  }
  .buffer(bufferSize, overflowStrategy)
  .runWith(sink)

不过,Future 永远不会返回,它会消耗必要的消息,然后继续重复轮询主题。有没有办法返回 Future 然后关闭消费者?

4

1 回答 1

2

由于 Kafka 用于流数据,因此没有“所有消息”之类的东西,因为新数据可以随时附加到主题。

我想,你可以做两件事:

  1. 检查最后返回了多少条记录poll并终止或
  2. 您需要通过 获取“当前日志结束” endOffsets,并将其与每个分区的最新记录的偏移量进行比较。如果两者都匹配,那么您可以返回。

第一种方法更简单,但可能有缺点,它不如第二种方法可靠。从理论上讲,即使有可用记录(即使发生这种情况的可能性不是很高),轮询也可能返回零记录。

不确定如何在 Scala 中表达这种终止条件(因为我对 Scala 不是很熟悉)。

于 2017-03-07T01:33:36.953 回答