0

我想使用Databricks Auto Loader设置 S3 流。我设法设置了流,但我的 S3 存储桶包含不同类型的 JSON 文件。我想将它们过滤掉,最好是在流本身中而不是使用filter操作。

根据文档,我应该能够使用 glob 模式进行过滤。但是,我似乎无法让它工作,因为它无论如何都会加载所有内容。

这就是我所拥有的

df = (
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
  .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
  .option("includeExistingFiles", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
#   .option("cloudFiles.schemaHints", schemaHints)
#  .load("s3://<BUCKET>/qualifier/**/*_INPUT")
  .load("s3://<BUCKET>/qualifier")
  .withColumn("filePath", F.input_file_name())
  .withColumn("date_ingested", F.current_timestamp())
)

我的文件有一个结构为 的键qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json,因此我想过滤包含名称输入的文件。

这似乎加载了所有内容:这就是我想要做的.load("s3://<BUCKET>/qualifier").load("s3://<BUCKET>/qualifier/**/*_INPUT")但这不起作用。(我也试过.load("s3://<BUCKET>/qualifier/**/*_INPUT.json"

我的 glob 模式是不正确的,还是我还缺少其他东西?

4

0 回答 0