我正在尝试使用以下命令从 Dataflow(Apache Beam)写入 Confluent Cloud/Kafka:
kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
.withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
.withTopic("testtopic").withKeySerializer(StringSerializer.class)
.withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));
在哪里Map<String, Object> props = new HashMap<>();
(即现在为空)
在日志中,我得到:send failed : 'Topic testtopic not present in metadata after 60000 ms.'
该集群上确实存在该主题 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法。
我确实尝试了各种组合来将我从 Confluent Cloud 获得的 APIKey/Secret 传递给props
上面的身份验证,但我找不到有效的设置。