我正在尝试通过使用 KafkaConsumer.assign(partition)、KafkaConsumer.seek(partition, offset) 来实现一种随机访问来自 Kafka 的消息的方法。然后阅读一条消息的民意调查。
但是在这种情况下,我每秒无法收到超过 500 条消息。相比之下,如果我“订阅”分区,我将获得 100,000+ msg/sec。(@1000 字节消息大小)
我试过了:
- Broker、Zookeeper、Consumer 在同一台主机上和不同主机上。(不使用复制)
- 1 和 15 分区
- “server.properties”中的默认线程配置并增加到 20(io 和网络)
- 单个消费者每次分配到不同的分区,每个分区一个消费者
- 单线程消费多线程消费(调用多个不同的消费者)
- 在两个代理上添加两个代理和一个带有分区的新主题
- 启动多个 Kafka 消费者进程
- 更改消息大小 5k、50k、100k -
在所有情况下,我得到的最小值是~200 msg/sec。如果我使用 2-3 个线程,则最大值为 500。但是在上面,使“.poll()”调用花费的时间越来越长(从单个线程的 3-4 毫秒到 10 个线程的 40-50 毫秒)。
我幼稚的 kafka 理解是消费者打开与代理的连接并发送请求以检索其日志的一小部分。虽然所有这些都涉及一些延迟,并且检索一批消息会更好 - 我想它会随着所涉及的接收器数量而扩展,但会增加运行消费者的 VM 和服务器上的服务器使用率。运行代理的虚拟机。但两人都闲着。
所以显然在代理端发生了一些同步,但我不知道这是由于我使用 Kafka 还是由于使用 .seek 的一些固有限制
我会欣赏一些关于我是否应该尝试其他东西的提示,或者这就是我所能得到的。