正如标题所说,我需要在我的 Databricks 笔记本中使用来自 CloudKarafka(免费 Kafka 集群)中某个主题的消息。我有以下代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Simple CloudKarafka Read') \
.getOrCreate()
df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'SERVERS_COMMA_SEPARATED') \
.option('subscribe', 'TOPIC_NAME') \
.option('kafka.security.protocol','SASL_SSL')\
.option('kafka.sasl.mechanisms','SCRAM-SHA-256')\
.option('kafka.sasl.jaas.config', 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";')\
.load()
但是当我执行这段代码时:
df.writeStream \
.format('console') \
.trigger(processingTime='2 seconds') \
.start()
我收到了这个错误:java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
你能帮助我吗?
提前致谢。