0

我们有一些应用程序使用 Kafka 中的assign(Collection<TopicPartition> partitions)方法直接从 Kafka 消费KafkaConsumer。他们不在group.idKafka中使用。我们想确定这些消费者。里面KafkaAdminClient有一个listConsumerGroups列出消费者的方法。但显然,这个方法只列出了消费者使用subscribe方法,而忽略了消费者使用assign方法。有没有使用方法找到消费者的assign方法?这是应用程序的示例代码:


import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition

import collection.JavaConverters._
import java.time.Duration
import java.util
object Test{
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("enable.auto.commit", "false")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String,String](props)
    consumer.assign(List(new TopicPartition("test",3)).asJava)
   // consumer.subscribe(List("test").asJava)

    while(true){
      val buffer = new util.ArrayList[AnyRef]
      val records = consumer.poll(Duration.ofMillis(100))
      import collection.JavaConverters._
      for (record <- records.asScala) {
        buffer.add(record)
      }
      if(buffer.size() > 0) {
        print(buffer)
        Thread.sleep(100)
      }
    }
  }
}
4

0 回答 0