如何使用消息批处理或带有 pykafka 的缓冲区生成 kafka 主题。我的意思是一个生产者可以在一个生产过程中产生许多消息。我知道使用消息批处理或缓冲区消息的概念,但我不知道如何实现它。我希望有人可以在这里帮助我
问问题
3749 次
2 回答
1
PyKafka 透明地处理生产者中的消息批处理——您不必做任何特殊的事情来确保消息是批量生成的。该类Producer
提供了一堆配置选项,让您自定义批处理行为。文档中提供了这些选项的完整列表,但其中一些最重要的是:
max_queued_messages
- 当您收到produce()
比这更多的消息时,立即发送该批次min_queued_messages
- 当你produce()
至少有这么多消息时,发送批处理linger_ms
- 自上一批以来已经过了这么长时间,发送该批
于 2017-09-14T18:59:39.163 回答
0
只需使用send()
方法。您不需要自己管理它。
send() 是异步的。当被调用时,它将记录添加到待处理记录发送的缓冲区并立即返回。这允许生产者将单个记录批处理在一起以提高效率。
您的任务只是为此配置两个道具:batch_size和linger_ms。
生产者为每个分区维护未发送记录的缓冲区。这些缓冲区的大小由“batch_size”配置指定。使这个更大可以导致更多的批处理,但需要更多的内存(因为我们通常会为每个活动分区使用这些缓冲区之一)。
这两个道具将通过以下方式完成:
一旦我们获得了一个分区的 batch_size 值的记录,无论这个设置如何,它都会立即发送,但是如果我们为这个分区积累的字节数少于这个数量,我们将“徘徊”指定的时间,等待更多的记录出现。
于 2017-08-24T13:29:46.763 回答