0

我正在从 cassandra db 读取数据并对其进行一些转换,然后通过 .save() 批处理方法将数据发送到 kafka。我也在使用 Kafka Producer 来设置属性。但每次我收到以下错误: 原因:org.apache.kafka.common.errors.TimeoutException:60000 毫秒后元数据中不存在主题 XXXXXXXXXXXXX。所有配置、凭据均已设置。相同的代码在本地运行良好,因为那里没有 SASL 机制,但在集群上出现上述异常。请帮忙。

System.setProperty("java.security.auth.login.config","/apps/xxxx/jaas.conf")
val props = new Properties()
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("acks","all")   
props.put("bootstrap.servers","xxxxxx1:9095,xxxxxx2:9095,xxxxx3:9095")
props.put("ssl.truststore.location","/home/xxxxx/ocrptrust.jks")
props.put("ssl.truststore.password","xxxxxxxxx")
props.put("sasl.mechanism","SCRAM-SHA-512")
  props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
props.put("security.protocol","SASL_SSL")
val producerConfig = new KafkaProducer[String,String](props)
jsonRead.selectExpr("CAST(householdID AS STRING) AS key", "to_json(struct(*)) AS value")
.write.format("kafka")
.option("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
.option("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
.option("acks","all")
.option("ssl.truststore.location","/home/xxxxx/ocrptrust.jks")
.option("ssl.truststore.password","xxxxxx")
.option("sasl.mechanism","SCRAM-SHA-512")
.option("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
.option("security.protocol","SASL_SSL")
.option("kafka.bootstrap.servers","xxxxxx1:9095,xxxxxx2:9095,xxxxx3:9095")
.option("topic", "xxxxxxxxxxx").save()
4

0 回答 0