3

我们可以使用Autoloader来跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载的文件列表?

我可以在 AWS Glue 作业书签中轻松执行此操作,但我不知道如何在 Databricks Autoloader 中执行此操作。

4

2 回答 2

0
.load("path")
.withColumn("filePath",input_file_name())

例如,您可以将 filePath 插入到您的流接收器,然后从那里获得不同的值或使用 forEatch / forEatchBatch,例如将其插入到 spark sql 表中

于 2021-12-06T11:30:07.243 回答
0

您可以使用结构化流获取加载到 S3 的文件的通知。对于已经加载的文件,可以检查s3_output_path目标路径。

    df = (spark.readStream.format('cloudFiles') \
    .option("cloudFiles.format",    "json") \
    .option("cloudFiles.region", "<aws region>) \
    .option("cloudFiles.awsAccessKey",<ACCESS_KEY>) \
    .option("cloudFiles.awsSecretKey", <SECRET_KEY>) \
   .option ("cloudFiles.useNotifications", "true") \
   .load(<s3_path>))

    df.writeStream.format('delta').outputMode("append") \
      .option("checkpointLocation", <checkpoint_path>) \
      .start(<s3_output_path>)
于 2022-02-08T17:55:30.350 回答