使用kafka时,我可以通过设置我的 kafka 生产者的 kafka.compression.codec 属性来设置编解码器。
假设我在生产者中使用 snappy 压缩,当使用一些 kafka 消费者消费来自 kafka 的消息时,我应该做些什么来解码来自 snappy 的数据,还是它是 kafka 消费者的一些内置功能?
在相关文档中,我找不到与 kafka 消费者中的编码相关的任何属性(它仅与生产者有关)。
有人可以清除这个吗?
根据我的理解,解压缩由消费者自己负责。正如他们的官方wiki页面中提到的
The consumer iterator transparently decompresses compressed data and only returns an uncompressed message
如本文所述,消费者的工作方式如下
消费者拥有后台“获取器”线程,它们连续从代理中批量获取 1MB 的数据,并将其添加到内部阻塞队列中。消费者线程从这个阻塞队列中取出数据,解压缩并遍历消息
而且在端到端批量压缩下的文档页面中,它写道
一批消息可以聚集在一起压缩并以这种形式发送到服务器。这批消息会以压缩的形式写入,并且会在日志中保持压缩状态,只会被消费者解压。
所以看起来解压缩部分是在消费者自己处理的,你需要做的就是compression.codec
在创建生产者时使用 ProducerConfig 属性提供有效/支持的压缩类型。我找不到任何示例或解释,它说明了消费者端的任何减压方法。如果我错了,请纠正我。
我对 v0.8.1 有同样的问题,除了说消费者应该“透明地”解压缩它从未做过的压缩数据之外,Kafka 中的这种压缩解压缩记录很少。
在 Kafka 网站中使用ConsumerIterator的示例高级消费者客户端仅适用于未压缩的数据。在 Producer 客户端中启用压缩后,消息永远不会进入以下“while”循环。希望他们应该尽快解决这个问题,或者他们不应该声称这个功能,因为一些用户可能使用 Kafka 来传输需要批处理和压缩功能的大尺寸消息。
ConsumerIterator <byte[], byte[]> it = stream.iterator();
while(it.hasNext())
{
String message = new String(it.next().message());
}