我们有一些应用程序使用 Kafka 中的assign(Collection<TopicPartition> partitions)方法直接从 Kafka 消费KafkaConsumer。他们不在group.idKafka中使用。我们想确定这些消费者。里面KafkaAdminClient有一个listConsumerGroups列出消费者的方法。但显然,这个方法只列出了消费者使用subscribe方法,而忽略了消费者使用assign方法。有没有使用方法找到消费者的assign方法?这是应用程序的示例代码:
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._
import java.time.Duration
import java.util
object Test{
def main(args: Array[String]): Unit = {
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092")
props.setProperty("enable.auto.commit", "false")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String,String](props)
consumer.assign(List(new TopicPartition("test",3)).asJava)
// consumer.subscribe(List("test").asJava)
while(true){
val buffer = new util.ArrayList[AnyRef]
val records = consumer.poll(Duration.ofMillis(100))
import collection.JavaConverters._
for (record <- records.asScala) {
buffer.add(record)
}
if(buffer.size() > 0) {
print(buffer)
Thread.sleep(100)
}
}
}
}