0

我正在使用响应式卡夫卡(akka kafka 流):

https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html

使用reactive-kafka的以下代码的等价物是什么?

import org.apache.kafka.clients.consumer.KafkaConsumer
...
val properties = new Properties()
properties.put("bootstrap.servers", "kafka:9092")

val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.partitionsFor("my-topic")
4

1 回答 1

1

这目前没有直接在 Reactive Kafka API 中公开。

这可能是因为 Reactive Kafka 专注于以响应方式与基于 Akka-Streams 的消费者/生产者之间的流式事件。

您仍然可以使用官方 Kafka 客户端让消费者从集群中获取元数据。这不需要额外的依赖,因为它是由 Reactive Kafka 引入的。

于 2017-10-02T11:13:27.783 回答