0

我想使用结构化流向文本文件附加行。此代码导致SparkException: Task not serializable. 我认为toDF是不允许的。我怎样才能让这段代码工作?

df.writeStream
  .foreach(new ForeachWriter[Row] {
    override def open(partitionId: Long, version: Long): Boolean = {
      true 
    }

    override def process(row: Row): Unit = {
       val df = Seq(row.getString(0)).toDF

       df.write.format("text").mode("append").save(output)
    } 

    override def close(errorOrNull: Throwable): Unit = {
    }      
  }).start
4

1 回答 1

2

你不能调用df.write.format("text").mode("append").save(output)内部process方法。它将在执行者端运行。您可以改用文件接收器,例如

df.writeStream.format("text")....
于 2017-03-09T05:01:01.957 回答