我们可以使用Autoloader来跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载的文件列表?
我可以在 AWS Glue 作业书签中轻松执行此操作,但我不知道如何在 Databricks Autoloader 中执行此操作。
我们可以使用Autoloader来跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载的文件列表?
我可以在 AWS Glue 作业书签中轻松执行此操作,但我不知道如何在 Databricks Autoloader 中执行此操作。
.load("path")
.withColumn("filePath",input_file_name())
例如,您可以将 filePath 插入到您的流接收器,然后从那里获得不同的值或使用 forEatch / forEatchBatch,例如将其插入到 spark sql 表中
您可以使用结构化流获取加载到 S3 的文件的通知。对于已经加载的文件,可以检查s3_output_path目标路径。
df = (spark.readStream.format('cloudFiles') \
.option("cloudFiles.format", "json") \
.option("cloudFiles.region", "<aws region>) \
.option("cloudFiles.awsAccessKey",<ACCESS_KEY>) \
.option("cloudFiles.awsSecretKey", <SECRET_KEY>) \
.option ("cloudFiles.useNotifications", "true") \
.load(<s3_path>))
df.writeStream.format('delta').outputMode("append") \
.option("checkpointLocation", <checkpoint_path>) \
.start(<s3_output_path>)