是否有可能使用 Akka Streams Kafka 获得关于 Kafka 主题的最后一条消息?我正在创建一个监听 Kafka 主题的 websocket,但目前它在我连接时检索所有先前的未红色消息。这可以加起来很多消息,所以我只对最后一条消息+任何未来消息感兴趣。(或只有未来的消息)
来源:
def source(): Flow[Any, String, NotUsed] = {
val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}
消费者设置:
@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
val deserializer = new StringDeserializer()
val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
.getOrElse(Configuration.empty)
ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(GroupId)
}
我试过添加设置ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
哪个应该“自动将偏移重置为最新的偏移”,但它似乎没有任何效果。