1

我已经从组中实现了一个 Kafka 高级消费者:'org.apache.kafka',名称:'kafka_2.10',版本:Java 中的'0.9.0.1'。

我注意到 consumerIterator 是线程阻塞的。它的 hasNext() 不会返回,直到有新的 msg 来消费。我使用 consumer.timeout.ms 来克服这个问题。但有时我需要每当用户关闭时,消费者应该关闭。由于线程正忙于在 consumerIterator 中等待,我将无法关闭。

有没有办法克服 Kafka 消费者中的线程阻塞?我可以动态更改 kafka 属性值吗?

4

0 回答 0