0

我有一个从 Kafka 作为源读取的流式查询。我想对从流中接收到的每个批次执行一些逻辑。到目前为止,我是这样做的

val streamDF = spark
               .readStream
               ...
               .load()

//val bc = spark.sparkContext.broadcast(spark)

streamDF
     .writeStream
     .foreach( new ForeachWriter[Row] {
             def open(partitionId: Long, version: Long): Boolean = {true}
            
             def process(record: String) = {
                         val aRDD = spark.sparkContext.parallelize(Seq('a','b','C'))
                         val aDF = spark.createDataframe(aRDD)
                         //val aDF = bc.vlaue.createDataframe(aRDD)
                 
                             // do something with aDF
               }

             def close(errorOrNull: Throwable): Unit = {}
  }
).start()

我使用的是 Spark 2.3.2,所以我坚持使用 ForeachWriter(我不能使用 foreachBatch,这会让我的生活更简单)。我也知道 foreach() 对执行程序执行。所以,记住这一点,我向所有执行者广播了 sparkSession。但这也无济于事。这是代码片段的注释部分。

我正在寻找一种解决方案,在 Spark 2.3.2 中将数据处理为 foreach 中的数据框(我必须使用数据框/数据集,因为操作非常繁重......它们也包括操作)

我发现了一个类似的问题,但没有任何回应 -->类似的 q

4

1 回答 1

0

抱歉,不是真的,但不可能在 Executor 上创建数据框。

数据框是 Spark 中的分布式集合。它们只能在 Driver 节点上或通过 Spark 应用程序中的转换(通过操作)创建。

于 2021-06-24T13:23:19.153 回答