1

我正在尝试使用以下命令从 Dataflow(Apache Beam)写入 Confluent Cloud/Kafka:

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
                                .withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
                                .withTopic("testtopic").withKeySerializer(StringSerializer.class)
                                .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

在哪里Map<String, Object> props = new HashMap<>();(即现在为空)

在日志中,我得到:send failed : 'Topic testtopic not present in metadata after 60000 ms.'

该集群上确实存在该主题 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法。

我确实尝试了各种组合来将我从 Confluent Cloud 获得的 APIKey/Secret 传递给props上面的身份验证,但我找不到有效的设置。

4

1 回答 1

3

找到了一个解决方案,感谢@RobinMoffatt 问题下方评论中的指示

这是我现在的设置:

Map<String, Object> props = new HashMap<>()

props.put("ssl.endpoint.identification.algorithm", "https");
props.put("sasl.mechanism", "PLAIN");
props.put("request.timeout.ms", 20000);
props.put("retry.backoff.ms", 500);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<APIKEY>\" password=\"<SECRET>\";");
props.put("security.protocol", "SASL_SSL");

kafkaKnowledgeGraphKVRecords.apply("Write to Kafka-TESTTOPIC", KafkaIO.<String, String>write()
    .withBootstrapServers("<CLUSTER>.confluent.cloud:9092")
    .withTopic("test").withKeySerializer(StringSerializer.class)
    .withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));

我错了的关键是sasl.jaas.config(注意;最后的!)

于 2020-01-17T09:00:23.543 回答