我正在尝试使用 akka 流缓冲区来提高流的吞吐量,我想知道它如何应用于 Kafka
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
尤其是,
val kafkaSource =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.buffer(10000, OverflowStrategy.backpressure)
关于底层 Kafka API,这里到底发生了什么?
我在底层 Kafka 客户端上有以下配置:
.withProperty(AUTO_OFFSET_RESET_CONFIG, offsetReset)
.withProperty(MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs.toString)
.withProperty(SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs.toString)
.withProperty(HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs.toString)
.withProperty(FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWaitMs.toString)
.withProperty(MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
.withProperty(FETCH_MAX_BYTES_CONFIG, maxPollRecords.toString)
.withProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPollRecords.toString)
因此我有一个MAX_POLL_RECORDS_CONFIG
,FETCH_MAX_BYTES_CONFIG
和MAX_PARTITION_FETCH_BYTES_CONFIG
我想知道的是缓冲区将如何相对于底层客户端上配置的获取进行播放。
- 是否
Consumer.committableSource
在其自己的 Actor 中实现,并通过其缓冲区从底层 Kafka 客户端接收消息?假设底层客户端被配置为获取多达一百万条消息,并且 Actor 作为1000
? 那是什么意思?会发生什么?Actor 缓冲区是否覆盖了 Kafka 客户端的轮询请求,或者它是否在其邮箱中获取 Kafka 客户端推送的数据,直到其轮询的结果(在底层客户端中配置的最大值)被传递通过?
我想我最需要知道 Kafka 流的内部和/或显式缓冲区如何与轮询请求的设置进行交互。