使用kafka 7.2,当使用生产者发送消息时,我发现一旦使用它,消息就会在消息的开头带有附加部分。
例如,当向 kafka 发送一个简单的字符串“King Daniel”时,它在字节数组中如下所示:
4B 69 6E 67 20 44 61 6E 69 65 6C
但是当我出于某种原因消费它时,我得到:
00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C
哪个是字符串“........ֲִ.|King Daniel”
所以我在消息的开头还有 12 个字符。这是某种标题吗?我怎样才能得到我的原始信息?
这是我的消费者代码:
public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
long offset = 0;
while (true) {
// create a fetch request for partition 0, current offset, and
// fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
ByteBuffer payload = msg.message().payload();
writer.writeToFile(payload.array());
// advance the offset after consuming each message
offset = msg.offset();
}
}
} catch (Exception e) {
LOG.error("Error occured while consuming from kafka", e);
}
}
所以我正在将其写入msg.message().payload().array()
一个文件,然后当我打开这个文件时,我可以看到原始内容,并在开头添加了 12 个额外的字符。
我怎样才能得到我确切的原始信息?