9

使用kafka时,我可以通过设置我的 kafka 生产者的 kafka.compression.codec 属性来设置编解码器。

假设我在生产者中使用 snappy 压缩,当使用一些 kafka 消费者消费来自 kafka 的消息时,我应该做些什么来解码来自 snappy 的数据,还是它是 kafka 消费者的一些内置功能?

相关文档中,我找不到与 kafka 消费者中的编码相关的任何属性(它仅与生产者有关)。

有人可以清除这个吗?

4

2 回答 2

15

根据我的理解,解压缩由消费者自己负责。正如他们的官方wiki页面中提到的 The consumer iterator transparently decompresses compressed data and only returns an uncompressed message

本文所述,消费者的工作方式如下

消费者拥有后台“获取器”线程,它们连续从代理中批量获取 1MB 的数据,并将其添加到内部阻塞队列中。消费者线程从这个阻塞队列中取出数据,解压缩并遍历消息

而且在端到端批量压缩下的文档页面中,它写道

一批消息可以聚集在一起压缩并以这种形式发送到服务器。这批消息会以压缩的形式写入,并且会在日志中保持压缩状态,只会被消费者解压。

所以看起来解压缩部分是在消费者自己处理的,你需要做的就是compression.codec在创建生产者时使用 ProducerConfig 属性提供有效/支持的压缩类型。我找不到任何示例或解释,它说明了消费者端的任何减压方法。如果我错了,请纠正我。

于 2013-11-11T09:22:47.500 回答
0

我对 v0.8.1 有同样的问题,除了说消费者应该“透明地”解压缩它从未做过的压缩数据之外,Kafka 中的这种压缩解压缩记录很少。

在 Kafka 网站中使用ConsumerIterator的示例高级消费者客户端仅适用于未压缩的数据。在 Producer 客户端中启用压缩后,消息永远不会进入以下“while”循环。希望他们应该尽快解决这个问题,或者他们不应该声称这个功能,因为一些用户可能使用 Kafka 来传输需要批处理和压缩功能的大尺寸消息。

ConsumerIterator <byte[], byte[]> it = stream.iterator();
while(it.hasNext())
{
   String message = new String(it.next().message());
}
于 2014-10-09T00:33:24.330 回答