43

使用 Apache Kafka Java 客户端(0.9),我正在尝试使用Kafka Producer 类向代理发送一长串记录。

异步发送方法会立即返回一段时间,然后在每次调用时开始阻塞一小段时间。大约 30 秒后,客户端开始抛出异常 ( TimeoutException ),并显示消息"Batch expired"

什么情况会导致这个异常被抛出?

4

7 回答 7

61

此异常表明您正在以比发送记录更快的速度排队记录。

当您调用send方法时,ProducerRecord将存储在内部缓冲区中以发送到代理。该方法在ProducerRecord被缓冲后立即返回,无论它是否已发送。

记录被分组为发送到代理的批次,以减少每条消息的传输偷听并增加吞吐量。

将记录添加到批次后,发送该批次有一个时间限制,以确保它已在指定的持续时间内发送。这由 Producer 配置参数request.timeout.ms控制,默认为 30 秒。

如果批处理的排队时间超过了超时限制,则会抛出异常。该批次中的记录将从发送队列中删除。

使用配置参数增加超时限制将允许客户端在过期之前将批处理排队更长的时间。

于 2016-01-14T16:08:34.517 回答
32

我在完全不同的情况下得到了这个例外。

我已经设置了一个动物园管理员 vm、一个代理 vm 和一个生产者/消费者 vm 的迷你集群。我在服务器 (9092) 和 zookeeper (2181) 上打开了所有必要的端口,然后尝试将消息从消费者/发布者 vm 发布到代理。我得到了 OP 提到的异常,但是由于到目前为止我只发布了一条消息(或者至少我尝试过),因此解决方案不能是增加超时或批量大小。所以我搜索并发现这个邮件列表描述了我在尝试从消费者/生产者 vm (ClosedChannelException) 中消费消息时遇到的类似问题:http: //grokbase.com/t/kafka/users/152jsjekrm/having-trouble -with-the-simplest-remote-kafka-config 这个邮件列表中的最后一篇文章实际上描述了如何解决这个问题。

长话短说,如果您同时遇到ChannelClosedException异常Batch Expired,您可能必须在文件中将此行更改为以下内容server.config并重新启动代理:

advertised.host.name=<broker public IP address>

如果没有设置,它会回退到host.name属性(可能两者都没有设置),然后回退到InetAddressJava 类的规范主机名,这当然最终是不正确的,因此会混淆远程节点。

于 2016-02-12T23:38:20.197 回答
3

控制发送到代理之前的时间的参数是linger.ms。其默认值为 0(无延迟)。

于 2016-01-21T20:12:07.580 回答
3

我正在使用 Kafka Java 客户端版本 0.11.0.0。我也开始看到相同的模式无法始终如一地生成大消息。它通过了一些消息,而对其他一些消息则失败了。(尽管通过和失败的消息大小相同)。在我的情况下,每条消息的大小约为 60KB,远高于 Kafka 的默认值batch.size16kB,我linger.ms也设置为默认值 0。这个错误被抛出为Producer 客户端在接收到来自服务器的成功响应之前超时。基本上,在我的代码中,此调用超时:kafkaProd.send(pr).get()。为了解决这个问题,我不得不将 Producer 客户端的默认值request.timeout.ms增加到 60000

于 2017-10-05T09:41:02.923 回答
1

在 docker-compose 中运行的 Kafka 也有类似的问题。我的 docker-compose.yml 设置为

 KAFKA_ADVERTISED_HOST_NAME: kafka
 ports:
        - 9092:9092

但是当我试图从外部码头用骆驼发送消息时

to("kafka:test?brokers=localhost:9092")

我有一个超时异常。我通过添加解决了它

127.0.0.1 kafka

到 Windows\System32\drivers\etc\hosts 文件,然后将我的骆驼网址更改为

to("kafka:test?brokers=kafka:9092")
于 2018-05-08T12:10:29.997 回答
0

我解决了。

我的Kafka部署在一个Docker容器中,容器的网络模式是桥接,主机和容器使用端口映射,我将Kafka服务器的默认端口改为9102。

server.properties 中解决问题的配置项是这两个:listeners Advertisementd.listeners

我尝试了几种组合:

成功:</p>

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

服务器无法启动:

listeners=PLAINTEXT://192.168.0.136:9102
advertised.listeners=PLAINTEXT://192.168.0.136:9102

超时错误:

listeners=PLAINTEXT://:9102
advertised.listeners=PLAINTEXT://:9102
于 2020-12-03T08:08:48.543 回答
-8

创建消费者时,将 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 true。

于 2017-10-02T15:09:53.653 回答