1

我是 kafka 流的新手,我想阅读一个主题并使用 kafka 流 api 在一个新主题中写入其中的一部分。我的键是字符串,值是 Avro 是否有我可以使用的文档/示例?

编辑 :

    final StreamsBuilder builder = new StreamsBuilder();
    final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
    final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
    newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
    streams.start();

在 SUB_TOPIC 我有:

键:{“ID”:“145”} 时间戳:2019 年 3 月 14 日 17:52:23.43 偏移量:12 分区:0

我的输入主题:

{“ID”:“145”,“TIMESTAMP”:1552585938545,“WEEK”:“\u0000”,“SOURCE”:{“string”:“TMP”},“BODY”:{“string”:“{\ "operation_type\":\"INSERT\",\"old\":{\"ROW_ID\":null,\"LAST_UPD\":null,\"DENOMINATION\":null,\"SIREN_SIRET\":null} ,\"new\":{\"ROW_ID\":\"170309-********\",\"LAST_UPD\":\"2019-03-14T17:52:18\",\ "DENOMINATION\":\"1-******\",\"SIREN_SIRET\":null}}" }, "TYPE_ACTION": { "string": "INSERT" } }

如何在新主题中添加 Body 中的其他字段?例子 :

{“ID”:“145”,“TIMESTAMP”:1552585938545,“WEEK”:“\u0000”,“SOURCE”:{“string”:“TMP”},“BODY”:{“string”:“{\ "operation_type\":\"INSERT\",\"old\":{\"ROW_ID\":null,\"LAST_UPD\":null},\"new\":{\"ROW_ID\":\" 170309-********\",\"LAST_UPD\":\"2019-03-14T17:52:18\"}}" }, "TYPE_ACTION": { "string": "INSERT" } }

4

1 回答 1

1

您可以简单地将主题作为流使用并使用 .map()/.mapValues() 函数修改值/键值。

示例:假设您想从 avro 记录中选择一列并发布到新的输出主题。

// If you are using Schema registry, make sure to add the schema registry url 
// in streamConfiguration. Also specify the AvroSerde for VALUE_SERDE

final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("inputTopic");
final KStream<String, String> newStream = userProfiles.mapValues(value -> value.get("fieldName").toString());
subStream.to("outputTopic",Produced.with(Serdes.String(),Serdes.String());
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

此外,您可以查看 github 上的示例:
https://github.com/confluentinc/kafka-streams-examples/blob/5.1.2-post/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample。爪哇

于 2019-03-14T14:45:43.313 回答