9

我是 Kafka、序列化和 JSON 的新手

我想要的是生产者通过 kafka 发送 JSON 文件,消费者以原始文件形式使用和使用 JSON 文件。

我能够得到它,因此 JSON 转换为字符串并通过字符串序列化器发送,然后消费者将解析字符串并重新创建 JSON 对象,但我担心这不是有效的或正确的方法(可能会丢失字段类型对于 JSON)

所以我考虑制作一个 JSON 序列化程序并将其设置在我的生产者的配置中。

我在这里使用了 JsonEncoder:Kafka:编写自定义序列化程序

但是当我现在尝试运行我的生产者时,似乎在编码器的 toBytes 函数中,try 块永远不会像我想要的那样返回任何东西

try {
            bytes = objectMapper.writeValueAsString(object).getBytes();

        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }

似乎objectMapper.writeValueAsString(object).getBytes(); 获取我的 JSON obj ( {"name":"Kate","age":25}) 并将其转换为空,

这是我的制片人的运行功能

List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();   

    JSONObject record = new JSONObject();

    record.put("name", "Kate");
    record.put("age", 25);

    msgList.add(new KeyedMessage<String, JSONObject>(topic, record));

    producer.send(msgList);

我错过了什么?我的原始方法(转换为字符串并发送然后重建 JSON obj)可以吗?还是不是正确的方法?

谢谢!

4

2 回答 2

6

嗯,你为什么害怕序列化/反序列化步骤会导致数据丢失?

您可以选择使用包含在Confluent 的 Schema Registry中的 Kafka JSON 序列化程序,它是免费的开源软件(免责声明:我在 Confluent 工作)。它的测试套件提供了一些示例来帮助您入门,更多细节在serializers 和 formatters中描述。这个 JSON 序列化程序和模式注册表本身的好处是它们为 Kafka 提供了与生产者和消费者客户端的透明集成。除了 JSON,如果您需要,还支持 Apache Avro。

恕我直言,就开发人员的便利性和在 JSON 中与 Kafka 交谈时的易用性而言,此设置是最佳选择之一——当然是 YMMV!

于 2015-10-06T06:50:05.493 回答
1

我建议将您的 JSON 事件字符串转换为字节数组,例如:

字节[] eventBody = event.getBody();

这将提高您的性能,并且 Kafka Consumer 还提供 JSON 解析器,这将帮助您获取 JSON。
如果需要任何进一步的信息,请告诉我。

于 2015-10-05T11:32:19.613 回答