2

target_table在使用 Spark Structured Streaming将新批次写入 HDFS () 后,我想将 Hive 表 ( ) 作为 DataFrame 加载target_table_dir,如下所示:

df.writeStream
  .trigger(processingTime='5 seconds')
  .foreachBatch(lambda df, partition_id:
    df.write
      .option("path", target_table_dir)
      .format("parquet")
      .mode("append")
      .saveAsTable(target_table))
  .start()

当我们立即从 Hive 表中读回相同的数据时,我们会得到一个“未找到分区异常”。如果我们延迟阅读,我们的数据是正确的。

似乎 Spark 仍在将数据写入 HDFS,而执行已停止并且 Hive Metastore 已更新,但数据仍在被写入 HDFS。

如何知道何时将数据写入 Hive 表(写入 HDFS)完成?

注意:我们发现如果我们在写出后使用 processAllAvailable(),后续读取工作正常。但是如果我们正在处理连续流,则 processAllAvailable() 将永远阻塞执行

4

0 回答 0