我有一个从 Kafka 作为源读取的流式查询。我想对从流中接收到的每个批次执行一些逻辑。到目前为止,我是这样做的
val streamDF = spark
.readStream
...
.load()
//val bc = spark.sparkContext.broadcast(spark)
streamDF
.writeStream
.foreach( new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = {true}
def process(record: String) = {
val aRDD = spark.sparkContext.parallelize(Seq('a','b','C'))
val aDF = spark.createDataframe(aRDD)
//val aDF = bc.vlaue.createDataframe(aRDD)
// do something with aDF
}
def close(errorOrNull: Throwable): Unit = {}
}
).start()
我使用的是 Spark 2.3.2,所以我坚持使用 ForeachWriter(我不能使用 foreachBatch,这会让我的生活更简单)。我也知道 foreach() 对执行程序执行。所以,记住这一点,我向所有执行者广播了 sparkSession。但这也无济于事。这是代码片段的注释部分。
我正在寻找一种解决方案,在 Spark 2.3.2 中将数据处理为 foreach 中的数据框(我必须使用数据框/数据集,因为操作非常繁重......它们也包括操作)
我发现了一个类似的问题,但没有任何回应 -->类似的 q