0

我在 DataFlow 上运行的 Apache Beam 管道中使用 KafkaIO 无界源。以下配置对我有用

Map<String, Object> kafkaConsumerConfig  = new HashMap<String, Object>() {{
  put("auto.offset.reset", "earliest");
  put("group.id", "my.group.id");
}};

p.apply(KafkaIO.<String, String>read()
        .withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
        .withConsumerConfigUpdates(kafkaConsumerConfig)
        .withTopic("my.topic")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withMaxNumRecords(10)
        .withoutMetadata())
// do something

现在,由于我对主题中的消息有一个 protobuf 定义,我想用它来转换 Java 对象中的 kafka 记录。

以下配置不起作用,需要编码器:

 p.apply(KafkaIO.<String, Bytes>read()
        .withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
        .withConsumerConfigUpdates(kafkaConsumerConfig)
        .withTopic("my.topic")
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(BytesDeserializer.class)
        .withMaxNumRecords(10)
        .withoutMetadata())

不幸的是,我找不到正确的 Value Deserializer + Coder 组合,也无法在文档中找到类似的示例。你有在 Apache Beam 中使用 Protobuf 和 Kafka 源代码的工作示例吗?

4

0 回答 0