5

我是 Apache Kafka 的新手,正在探索 SimpleConsumer 以读取来自该主题的消息。

我使用下面的代码来做同样的事情,

FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, 1024).build();
FetchResponse fetchResponse;
try {
     fetchResponse = consumer.fetch(fetchRequest);
 } catch (Exception e) {}

这会读取特定分区中的所有可用消息;我想设置要阅读的最大消息数。在这个阶段有没有办法做到这一点?当队列中有大量消息时,我不希望所有消息都落在 JVM 堆中。

另一个问题,

以下代码返回一个 ByteBufferMessageSet。

fetchResponse.messageSet(topic, partitionId);

这是否意味着,并非所有可用消息都实际进入内存?

4

2 回答 2

3

虽然您不能限制消息的数量,但您可以限制每个请求的每个主题分区接收的字节数。但是,这应该作为配置设置完成,而不是作为消费者实现代码的一部分。Kafka 消费者配置文档说您可以指定读取的最大字节数为socket.receive.buffer.bytes. 这应该允许您更细粒度地控制 Kafka 消息在 JVM 堆中究竟占用了多少空间。请注意,此值必须等于或大于代理上的最大消息大小,否则生产者可能会发送太大而无法使用的消息。

于 2015-03-11T22:00:26.747 回答
2
max.poll.records

在一次 poll() 调用中返回的最大记录数。 https://kafka.apache.org/documentation/#consumerconfigs

于 2019-10-23T14:50:45.843 回答