我正在尝试在纱线集群上使用 Kafka 测试结构化流的 foreach 并拥有这段小代码:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "data01:9092,data02:9092,data03:9092")
.option("subscribe", "cluster-topic-01")
.load()
/* This block works
val query = ds1.writeStream
.format("console")
.start()
*/
// this block doesn't have any call to open()
val query = ds1.writeStream.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
println("open("+partitionId+","+version+")")
true
}
override def process(record: Row) : Unit = {
// write string to connection
println("process() :: "+record)
}
override def close(errorOrNull: Throwable): Unit = {
// close the connection
println("close()")
}
}).start()
query.awaitTermination()
当我运行这个程序时,没有调用 open()。我已经比较了其他线程的其他示例,但我无法找出缺少的内容。在 foreach 之后有一个 start() 调用,这似乎与此处的文档相匹配。
我可以流式传输到文件或控制台,但无法在 foreach 中获得任何调用。
关于在哪里看的任何建议?TIA。