1

我在设置使用 TLS从spark连接到 Kafka 所需的参数时遇到问题。这是我目前的做法:

spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<my url>:tls port")
            .option("security.protocol", "SSL")
            .option("ssl.key.password", "<key password>")
            .option("ssl.keystore.location", "<keystore location>")
            .option("ssl.keystore.password", "<keystore password>")
            .option("ssl.endpoint.identification.algorithm", "")
            .option("subscribe",  "<my topic>")
            .load()

我还尝试使用前缀kafka.并将配置包含在我的 spark 提交中(使用--conf或包含.jks文件位置--files)。如果我使用spark.read而不是spark.readStream.

问题可以在日志中表示,其中我设置的参数仍然为空或继续具有默认值。此外,连接失败,就像我在不使用 TLS 证书的情况下尝试连接时一样(这是我当前的 kafka 所必需的):

{"Application":"My test application" ,"level": "INFO ", "timestamp": "2021-05-20 15:33:07,485", "classname": "org.apache.kafka.clients.consumer.ConsumerConfig", "body": "ConsumerConfig values: 
        [...]
        security.protocol = PLAINTEXT
        [...]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        [...]
        ssl.keystore.location = null
        ssl.keystore.password = null
        [...]
{"Application":"My test application" ,"level": "WARN ", "timestamp": "2021-05-20 15:33:08,669", "classname": "org.apache.kafka.clients.NetworkClient", "body": "[Consumer clientId=consumer-sample_table-1, groupId=sample_table] Bootstrap broker <my ip> (id: -1 rack: null) disconnected"}
Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

目前,我正在使用spark 3.0.0and scala 2.12。另外,我正在使用以下命令提交作业:

$SPARK_HOME/spark-submit --name "My application" \
--master yarn \
--deploy-mode client \
--class <main class> \
application.jar

有没有人有类似的问题?谢谢你。

更新 使用以下选项解决了我的问题:

spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<my url>:tls port")
            .option("kafka.security.protocol"", "SSL")
            .option("kafka.ssl.keystore.location", "<keystore location>")
            .option("kafka.ssl.keystore.password", "<keystore password>")
            .option("kafka.ssl.key.password", "<keystore password>")
            .option("kafka.ssl.endpoint.identification.algorithm", "")
            .option("subscribe",  "<my topic>")
            .load()
4

0 回答 0