我在设置使用 TLS从spark连接到 Kafka 所需的参数时遇到问题。这是我目前的做法:
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<my url>:tls port")
.option("security.protocol", "SSL")
.option("ssl.key.password", "<key password>")
.option("ssl.keystore.location", "<keystore location>")
.option("ssl.keystore.password", "<keystore password>")
.option("ssl.endpoint.identification.algorithm", "")
.option("subscribe", "<my topic>")
.load()
我还尝试使用前缀kafka.
并将配置包含在我的 spark 提交中(使用--conf
或包含.jks
文件位置--files
)。如果我使用spark.read
而不是spark.readStream
.
问题可以在日志中表示,其中我设置的参数仍然为空或继续具有默认值。此外,连接失败,就像我在不使用 TLS 证书的情况下尝试连接时一样(这是我当前的 kafka 所必需的):
{"Application":"My test application" ,"level": "INFO ", "timestamp": "2021-05-20 15:33:07,485", "classname": "org.apache.kafka.clients.consumer.ConsumerConfig", "body": "ConsumerConfig values:
[...]
security.protocol = PLAINTEXT
[...]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
[...]
ssl.keystore.location = null
ssl.keystore.password = null
[...]
{"Application":"My test application" ,"level": "WARN ", "timestamp": "2021-05-20 15:33:08,669", "classname": "org.apache.kafka.clients.NetworkClient", "body": "[Consumer clientId=consumer-sample_table-1, groupId=sample_table] Bootstrap broker <my ip> (id: -1 rack: null) disconnected"}
Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
目前,我正在使用spark 3.0.0
and scala 2.12
。另外,我正在使用以下命令提交作业:
$SPARK_HOME/spark-submit --name "My application" \
--master yarn \
--deploy-mode client \
--class <main class> \
application.jar
有没有人有类似的问题?谢谢你。
更新 使用以下选项解决了我的问题:
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<my url>:tls port")
.option("kafka.security.protocol"", "SSL")
.option("kafka.ssl.keystore.location", "<keystore location>")
.option("kafka.ssl.keystore.password", "<keystore password>")
.option("kafka.ssl.key.password", "<keystore password>")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("subscribe", "<my topic>")
.load()