我想使用Databricks Auto Loader设置 S3 流。我设法设置了流,但我的 S3 存储桶包含不同类型的 JSON 文件。我想将它们过滤掉,最好是在流本身中而不是使用filter
操作。
根据文档,我应该能够使用 glob 模式进行过滤。但是,我似乎无法让它工作,因为它无论如何都会加载所有内容。
这就是我所拥有的
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
.option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
.option("includeExistingFiles", "true")
.option("multiLine", "true")
.option("inferSchema", "true")
# .option("cloudFiles.schemaHints", schemaHints)
# .load("s3://<BUCKET>/qualifier/**/*_INPUT")
.load("s3://<BUCKET>/qualifier")
.withColumn("filePath", F.input_file_name())
.withColumn("date_ingested", F.current_timestamp())
)
我的文件有一个结构为 的键qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json
,因此我想过滤包含名称输入的文件。
这似乎加载了所有内容:这就是我想要做的.load("s3://<BUCKET>/qualifier")
,.load("s3://<BUCKET>/qualifier/**/*_INPUT")
但这不起作用。(我也试过.load("s3://<BUCKET>/qualifier/**/*_INPUT.json"
)
我的 glob 模式是不正确的,还是我还缺少其他东西?