0

情况:我正在生成一个增量文件夹,其中包含来自先前流式查询 A 的数据,稍后从另一个 DF 读取数据,如此处所示

DF_OUT.writeStream.format("delta").(...).start("path")

(...)

DF_IN = spark.readStream.format("delta").load("path)

1 -当我尝试在同一程序的后续 readStream(ETL 管道的链接查询)中以这种方式读取它时,我最终会遇到下面的异常。

2 -但是,当我在 scala REPL 中运行它时,它运行顺利。
不确定那里发生了什么,但它确实令人费解。

org.apache.spark.sql.AnalysisException: Table schema is not set.  Write data into it or use CREATE TABLE to set the schema.;
  at org.apache.spark.sql.delta.DeltaErrors$.schemaNotSetException(DeltaErrors.scala:365)
  at org.apache.spark.sql.delta.sources.DeltaDataSource.sourceSchema(DeltaDataSource.scala:74)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:209)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:95)
  at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:95)
  at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
  at org.apache.spark.ui.DeltaPipeline$.main(DeltaPipeline.scala:114)

4

2 回答 2

1

来自Delta Lake 快速指南 - 故障排除

  • 表架构未设置错误

问题:当 Delta 表的路径不存在,并尝试从中流式传输数据时,会出现以下错误。org.apache.spark.sql.AnalysisException:未设置表架构。将数据写入其中或使用 CREATE TABLE 设置架构。

解决方案:确保创建了 Delta 表的路径。

于 2021-01-31T12:27:40.100 回答
0

阅读错误消息后,我确实尝试成为一个好孩子并遵循建议,所以我试图确保在调用readStream之前我试图从中读取的 delta 文件夹中确实存在有效数据,瞧!


 def hasFiles(dir: String):Boolean = {
        val d = new File(dir)
        if (d.exists && d.isDirectory) {
                d.listFiles.filter(_.isFile).size > 0
        } else false
   }

DF_OUT.writeStream.format("delta").(...).start(DELTA_DIR)

while(!hasFiles(DELTA_DIR)){
         print("DELTA FOLDER STILL EMPTY")
         Thread.sleep(10000)
      }

print("FOUND DATA ON DELTA A - WAITING 30 SEC")
Thread.sleep(30000)

DF_IN = spark.readStream.format("delta").load(DELTA_DIR)

它最终工作了,但我必须确保等待足够的时间让“某事发生”(不知道究竟是什么 TBH,但似乎从 delta 读取需要一些写入才能完成 - 也许是元数据? -

然而,这仍然是一个黑客。我希望可以从一个空的 delta 文件夹开始读取并等待内容开始涌入其中。

于 2019-12-28T14:30:39.970 回答