0

在 Spring-Kafka 中,我想从头开始重新使用 Kafka 主题。通过将 更改group.id为 Kafka 未知的内容来做到这一点当然可以:

@KafkaListener(topics = "sensordata.t")
public void receiveMessage(String message) {
...
}

@Bean
public Map consumerConfigs() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //it still commits though...
    return props;
}

但是,通过将偏移量设置为 0 来重新开始会失败。

@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "sensordata.t",
        partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void receiveMessage(String message) {
...
}

@Bean
public Map consumerConfigs() {
    Map props = new HashMap<>();
    ...
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); //making timeout window larger seems to have no influence
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); //setting max records to 1 makes no difference
    return props;
}

我得到的错误:

2016-11-14 14:07:59.018  INFO 8165 --- [           main] c.i.t.s.server.SpringKafkaApplication    : Started SpringKafkaApplication in 4.134 seconds (JVM running for 4.745)
2016-11-14 14:07:59.125  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.125  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.129  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group spring8
2016-11-14 14:07:59.129  INFO 8165 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-11-14 14:07:59.129  INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group spring8
2016-11-14 14:07:59.338 ERROR 8165 --- [afka-consumer-1] essageListenerContainer$ListenerConsumer : Container exception

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) ~[kafka-clients-0.10.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) ~[kafka-clients-0.10.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:939) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:816) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:526) ~[spring-kafka-1.1.1.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_92]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_92]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92] 

有谁熟悉这个?

我正在使用Kafka 0.10.1.0

<properties>
    <java.version>1.8</java.version>
    <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
</properties>
4

1 回答 1

0

你为什么决定这是偏移量的问题0?您的 StackTrace 说您的时间pollTimeout长于session.timeout.ms

由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着后续调用 poll() 之间的时间比配置的 session.timeout.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加会话超时或通过使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。

如何正确调整它们?

于 2016-11-14T14:39:49.220 回答