我是 Kafka、序列化和 JSON 的新手
我想要的是生产者通过 kafka 发送 JSON 文件,消费者以原始文件形式使用和使用 JSON 文件。
我能够得到它,因此 JSON 转换为字符串并通过字符串序列化器发送,然后消费者将解析字符串并重新创建 JSON 对象,但我担心这不是有效的或正确的方法(可能会丢失字段类型对于 JSON)
所以我考虑制作一个 JSON 序列化程序并将其设置在我的生产者的配置中。
我在这里使用了 JsonEncoder:Kafka:编写自定义序列化程序
但是当我现在尝试运行我的生产者时,似乎在编码器的 toBytes 函数中,try 块永远不会像我想要的那样返回任何东西
try {
bytes = objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
似乎objectMapper.writeValueAsString(object).getBytes()
; 获取我的 JSON obj ( {"name":"Kate","age":25}
) 并将其转换为空,
这是我的制片人的运行功能
List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();
JSONObject record = new JSONObject();
record.put("name", "Kate");
record.put("age", 25);
msgList.add(new KeyedMessage<String, JSONObject>(topic, record));
producer.send(msgList);
我错过了什么?我的原始方法(转换为字符串并发送然后重建 JSON obj)可以吗?还是不是正确的方法?
谢谢!