我在 DataFlow 上运行的 Apache Beam 管道中使用 KafkaIO 无界源。以下配置对我有用
Map<String, Object> kafkaConsumerConfig = new HashMap<String, Object>() {{
put("auto.offset.reset", "earliest");
put("group.id", "my.group.id");
}};
p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withMaxNumRecords(10)
.withoutMetadata())
// do something
现在,由于我对主题中的消息有一个 protobuf 定义,我想用它来转换 Java 对象中的 kafka 记录。
以下配置不起作用,需要编码器:
p.apply(KafkaIO.<String, Bytes>read()
.withBootstrapServers("ip1:9092,ip2:9092,ip3:9092")
.withConsumerConfigUpdates(kafkaConsumerConfig)
.withTopic("my.topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(BytesDeserializer.class)
.withMaxNumRecords(10)
.withoutMetadata())
不幸的是,我找不到正确的 Value Deserializer + Coder 组合,也无法在文档中找到类似的示例。你有在 Apache Beam 中使用 Protobuf 和 Kafka 源代码的工作示例吗?