0

有一个 nodejs Kafka 生产者将文件内容发送到 Kafka。当 Kafka 消费者使用来自 Kafka 的消息时,它看起来像 -

{"type":"Buffer","data":[91,13 .....]

当我m.message.value.toString('utf8')在 nodejs Kafka 消费者中使用时,它会打印实际消息。但我需要在 java Kafka 消费者中消费。我尝试过 property.put("value.serializer.encoding", "utf8")new String(consumerRecord.value())仍然打印 { "type":"Buffer","data":[91,13 .....]。我的问题是如何在 java 中使用由 nodejs Kafka 生产者生成的字符串格式的消息。

4

1 回答 1

0

您应该在生产者和消费者 API 调用中使用正确的键和值序列化程序。

Kafka 提供了一些默认的键和值序列化器,如 StringSerializer 等。

用于生成和使用字符串消息的字符串序列化程序。

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

例如,您可以参考生产者和消费者文档。

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/消费者/KafkaConsumer.html

模式注册表 当您从 nodejs 发送消息并使用 kafka 消费者使用该消息时,您需要在 java kafka 消费者中提供消息反序列化器的相同键和值。

处理这个不兼容问题的一种选择是使用模式注册表来使用 kafka avro 存储我们的消息模式。然后我们可以在 nodejs kafka producer 和 java kafka consumer 中使用该模式来消费消息。

带有 avro 示例的Nodejs kafka 生产者

这是nodejs kafka avro的一个例子 在此处输入图像描述

Java Kafka 消费者使用模式注册表示例

于 2019-03-11T06:06:39.707 回答