1

我正在尝试通过使用 KafkaConsumer.assign(partition)、KafkaConsumer.seek(partition, offset) 来实现一种随机访问来自 Kafka 的消息的方法。然后阅读一条消息的民意调查。

但是在这种情况下,我每秒无法收到超过 500 条消息。相比之下,如果我“订阅”分区,我将获得 100,000+ msg/sec。(@1000 字节消息大小)

我试过了:

  1. Broker、Zookeeper、Consumer 在同一台主机上和不同主机上。(不使用复制)
  2. 1 和 15 分区
  3. “server.properties”中的默认线程配置并增加到 20(io 和网络)
  4. 单个消费者每次分配到不同的分区,每个分区一个消费者
  5. 单线程消费多线程消费(调用多个不同的消费者)
  6. 在两个代理上添加两个代理和一个带有分区的新主题
  7. 启动多个 Kafka 消费者进程
  8. 更改消息大小 5k、50k、100k -

在所有情况下,我得到的最小值是~200 msg/sec。如果我使用 2-3 个线程,则最大值为 500。但是在上面,使“.poll()”调用花费的时间越来越长(从单个线程的 3-4 毫秒到 10 个线程的 40-50 毫秒)。

我幼稚的 kafka 理解是消费者打开与代理的连接并发送请求以检索其日志的一小部分。虽然所有这些都涉及一些延迟,并且检索一批消息会更好 - 我想它会随着所涉及的接收器数量而扩展,但会增加运行消费者的 VM 和服务器上的服务器使用率。运行代理的虚拟机。但两人都闲着。

所以显然在代理端发生了一些同步,但我不知道这是由于我使用 Kafka 还是由于使用 .seek 的一些固有限制

我会欣赏一些关于我是否应该尝试其他东西的提示,或者这就是我所能得到的。

4

1 回答 1

4

Kafka 是设计上的流媒体平台。这意味着已经开发了很多很多东西来加速顺序访问。批量存储消息只是一回事。当您使用时,poll()您会以这种方式使用 Kafka,而 Kafka 会尽力而为。随机访问Kafka 没有设计的。

如果你想快速随机访问分布式大数据,你会想要别的东西。例如,像 Cassandra 这样的分布式数据库或像 Hazelcast 这样的内存系统。
此外,您可能希望将 Kafka 流转换为另一个允许您使用顺序方式的流。

于 2019-04-12T11:10:19.303 回答