1

我想知道下面的伪代码是否是从 PySpark(Azure Databricks)读取存储在 Azure Data Lake 中的日期范围之间的多个镶木地板文件的有效方法。注意:parquet 文件不按日期分区。

我使用 uat/EntityName/2019/01/01/EntityName_2019_01_01_HHMMSS.parquet 约定在 ADL 中存储数据,如 Nathan Marz 的《大数据》一书中所建议的,稍作修改(使用 2019 而不是 year=2019)。

使用 * 通配符读取所有数据:

df = spark.read.parquet(uat/EntityName/*/*/*/*)

添加一个列 FileTimestamp,使用字符串操作从 EntityName_2019_01_01_HHMMSS.parquet 中提取时间戳并转换为 TimestampType()

df.withColumn(add timestamp column)

使用过滤器获取相关数据:

start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)

本质上,我使用 PySpark 来模拟 U-SQL 中可用的简洁语法:

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE 
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");
4

1 回答 1

1

分区数据的正确方法是对数据使用格式 year=2019、month=01 等。

当您使用过滤器查询此数据时,例如:

df.filter(df.year >= myYear)

然后 Spark 将只读取相关文件夹。

过滤列名称准确地出现在文件夹名称中是非常重要的。请注意,当您使用 Spark(例如按年、月、日)写入分区数据时,它不会将分区列写入 parquet 文件。相反,它们是从路径中推断出来的。这确实意味着您的数据框在编写时将需要它们。当您从分区源读取时,它们也将作为列返回。

如果您无法更改文件夹结构,您始终可以手动减少 Spark 使用 regex 或 Glob 读取的文件夹 - 本文应该使用 Date Ranges 提供更多关于分区数据的上下文 Spark SQL 查询。但显然这更加手动和复杂。

更新:进一步的示例我可以将多个文件从 S3 读取到 Spark Dataframe 中,传递不存在的文件吗?

同样来自 Bill Chambers 的“Spark - The Definitive Guide: Big Data Processing Made Simple”:

分区是一种工具,可让您在编写数据时控制存储的数据(以及存储位置)。当您将文件写入分区目录(或表)时,您基本上将列编码为文件夹。这允许您在稍后读取时跳过大量数据,允许您仅读取与您的问题相关的数据,而不必扫描完整的数据集。...

当您有一个读者在操作之前经常过滤的表时,这可能是您可以使用的最低限度的优化。例如,日期对于分区特别常见,因为在下游,我们通常只想查看前一周的数据(而不是扫描整个记录列表)。

于 2019-03-03T12:32:53.910 回答