我是 kafka 流的新手,我想阅读一个主题并使用 kafka 流 api 在一个新主题中写入其中的一部分。我的键是字符串,值是 Avro 是否有我可以使用的文档/示例?
编辑 :
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
在 SUB_TOPIC 我有:
键:{“ID”:“145”} 时间戳:2019 年 3 月 14 日 17:52:23.43 偏移量:12 分区:0
我的输入主题:
{“ID”:“145”,“TIMESTAMP”:1552585938545,“WEEK”:“\u0000”,“SOURCE”:{“string”:“TMP”},“BODY”:{“string”:“{\ "operation_type\":\"INSERT\",\"old\":{\"ROW_ID\":null,\"LAST_UPD\":null,\"DENOMINATION\":null,\"SIREN_SIRET\":null} ,\"new\":{\"ROW_ID\":\"170309-********\",\"LAST_UPD\":\"2019-03-14T17:52:18\",\ "DENOMINATION\":\"1-******\",\"SIREN_SIRET\":null}}" }, "TYPE_ACTION": { "string": "INSERT" } }
如何在新主题中添加 Body 中的其他字段?例子 :
{“ID”:“145”,“TIMESTAMP”:1552585938545,“WEEK”:“\u0000”,“SOURCE”:{“string”:“TMP”},“BODY”:{“string”:“{\ "operation_type\":\"INSERT\",\"old\":{\"ROW_ID\":null,\"LAST_UPD\":null},\"new\":{\"ROW_ID\":\" 170309-********\",\"LAST_UPD\":\"2019-03-14T17:52:18\"}}" }, "TYPE_ACTION": { "string": "INSERT" } }