我可能错过了卡夫卡消费者的观点,但我想做的是:
消费者订阅一个主题,获取该主题内的所有消息并返回一个包含所有这些消息列表的 Future
我为尝试完成此操作而编写的代码是
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
不过,Future 永远不会返回,它会消耗必要的消息,然后继续重复轮询主题。有没有办法返回 Future 然后关闭消费者?