0

我在使用 KafkaConsumer.poll(duration timeout) 时遇到了困难,其中它无限期地运行并且永远不会退出该方法。了解这可能与连接有关,有时我看到它有点不一致。如果poll停止响应,我该如何处理?下面给出的是来自KafkaConsumer.poll()的片段

public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

我从这里调用上述内容:

Duration timeout = Duration.ofSeconds(30);
    while (true) {
        final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
        System.out.println("record count is" + records.count());
}

我收到以下错误:

org.apache.kafka.common.errors.SerializationException:在偏移量 2 处反序列化分区的键/值时出错。如果需要,请寻找过去的记录以继续消费。

4

2 回答 2

1

在尝试解决上述问题时,我偶然发现了一些有用的信息。我将提供一段应该能够处理此问题的代码,但在此之前,了解导致此问题的原因很重要。

在向 Apache Kafka 生成或使用消息或数据时,我们需要该消息或数据的模式结构,在我的例子中是 Avro 模式。如果向 Kafka 生成的消息与该消息模式冲突,则会对消费产生影响。

在消费记录的方法中在您的消费者主题中添加以下代码 -

请记住导入以下包:

导入 org.apache.kafka.common.TopicPartition;
导入 org.jsoup.SerializationException;

try {
        while (true) {
            ConsumerRecords<String, GenericRecord> records = null;
            try {
                records = consumer.poll(10000);
            } catch (SerializationException e) {
                String s = e.getMessage().split("Error deserializing key/value 
for partition ")[1].split(". If needed, please seek past the record to 
continue consumption.")[0];
                String topics = s.split("-")[0];
                int offset = Integer.valueOf(s.split("offset ")[1]);
                int partition = Integer.valueOf(s.split("-")[1].split(" at") . 
   [0]);

                TopicPartition topicPartition = new TopicPartition(topics, 
 partition);
                //log.info("Skipping " + topic + "-" + partition + " offset " 
 + offset);
                consumer.seek(topicPartition, offset + 1);
            }


            for (ConsumerRecord<String, GenericRecord> record : records) {

                System.out.printf("value = %s \n", record.value());


            }

        }


    } finally {
        consumer.close();
    }
于 2020-02-17T01:04:57.620 回答
0

我在设置测试环境时遇到了这个问题。

在代理上运行以下命令会按预期打印出存储的记录:

bin/kafka-console-consumer.sh --bootstrap-server="localhost:9092" --topic="foo" --from-beginning

原来是Kafka服务器配置错误。要从外部 IP 地址连接,listeners必须在 中具有有效值kafka/config/server.properties,例如

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
于 2021-10-14T03:48:35.627 回答