我之前已经成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流。我尝试使用在线示例:https ://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html
使用以下类似代码:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
但是,我总是以以下错误告终:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
在创建 ds1 时,我还尝试将其添加到我的选项集中:
.option("partition.assignment.strategy", "range")
但即使明确地为它分配一个值也不能阻止错误,我可以在网上或 Kafka 文档中找到的任何其他值(如“roundrobin”)也没有。
我还尝试使用“assign”选项并获得相同的错误(我们的 Kafka 主机设置为分配 - 每个消费者只分配一个分区,并且我们没有任何重新平衡)。
知道这里发生了什么吗?该文档没有帮助(可能是因为它仍处于实验阶段)。另外,是否有使用 KafkaUtils 进行结构化流式处理的方法?或者这是唯一的网关?