0

我一直在尝试使用 kafkacat 在主题中查找消息并将其发布回主题中。我们使用 protobuf,因此消息值应该以字节为单位(键可以不同,例如字符串或字节)。但是,我无法发布可以正确反序列化的消息。

我怎么能用 kafkacat 做到这一点?我也愿意使用其他推荐的工具来做到这一点。


示例尝试:

kafkacat -b <broker> -t <topic> -o -10 -e -c 1 -C -K: > test2.txt
cat test2.txt | kafkacat -b <broker> -t <topic> -P -K:

test2.txt 显示:

21aa7e2f-41a1-4972-9108-3057627d53f0:
Y/<protobuf.class.path>i
$21aa7e2f-41a1-4972-9108-3057627d53f0A
DIABETEBG_METERBG300"BG300*QP3687WK02000012????

但是当我使用相同的 kafkacat consumer 命令获取结果时,我只得到最后一行:

DIABETEBG_METERBG300"BG300*QP3687WK02000012????

我认为问题在于消费输出行(可能是有效负载的一部分?)并且生产者将每一行视为一条新消息。

4

1 回答 1

0

生产者将每一行视为一条新消息

这是正确的。

如果您有二进制文件,我建议为此编写代码,因为 kafkacat 假定 UTF8 编码的字符串作为输入

于 2021-02-23T19:57:05.983 回答