0

df在 pySpark 中有一些数据框,它来自于调用:

df = spark.sql("select A, B from org_table")
df = df.stuffIdo

我想org_table在我的脚本末尾覆盖。由于禁止覆盖输入表,我检查了我的数据:

sparkContext.setCheckpointDir("hdfs:/directoryXYZ/PrePro_temp")
checkpointed = df.checkpoint(eager=True)

血统现在应该被打破,我也可以用checkpointed.show()(作品)查看我的检查点数据。什么不起作用是写表:

checkpointed.write.format('parquet')\
    .option("checkpointLocation", "hdfs:/directoryXYZ/PrePro_temp")\
    .mode('overwrite').saveAsTable('org_table')

这会导致错误:

引起:java.io.FileNotFoundException:文件不存在:hdfs://org_table_path/org_table/part-00081-4e9d12ea-be6a-4a01-8bcf-1e73658a54dd-c000.snappy.parquet

我已经尝试了几件事,比如在写作之前刷新 org_table 等,但我在这里感到困惑。我该如何解决这个错误?

4

1 回答 1

0

我会小心转换输入是新输出的此类操作。这样做的原因是,如果出现任何错误,您可能会丢失数据。假设您的转换逻辑有问题,并且您生成了无效数据。但你只在一天后看到了这一点。此外,要修复错误,您不能使用刚刚转换的数据。您需要转换前的数据。您如何使数据再次保持一致?

另一种方法是:

  • 暴露视图
  • 在每批你都在写一个新表,最后你只用这个新表替换视图
  • 几天后,您还可以计划一项清理工作,该工作将删除过去 X 天的表格

如果您想继续使用您的解决方案,为什么不简单地这样做而不是处理检查点呢?

df.write.parquet("hdfs:/directoryXYZ/PrePro_temp")\
    .mode('overwrite')

df.load("hdfs:/directoryXYZ/PrePro_temp").write.format('parquet').mode('overwrite').saveAsTable('org_table')

当然,您将读取数据两次,但它看起来不像带有检查点的那次那样笨拙。此外,您每次都可以将“中间”数据存储在不同的目录中,因此您可以解决我在开始时暴露的问题。即使您有错误,您仍然可以通过简单地选择一个好的目录并执行.write.format(...)org_table 来带来有效版本的数据。

于 2019-06-28T12:07:14.193 回答