2

我有一个大型的日常文件数据集,位于/some/data/{YYYYMMDD}.parquet(或者也可以是 smth like /some/data/{YYYY}/{MM}/{YYYYMMDD}.parquet)。

我在 mycat.yaml 文件中描述数据源如下:

sources:
  source_paritioned:
    args:
      engine: pyarrow
      urlpath: "/some/data/*.parquet"
    description: ''
    driver: intake_parquet.source.ParquetSource

我希望能够将文件子集(分区)读入内存,

如果我跑, source = intake.open_catalog('mycat.yaml').source_partitioned; print(source.npartitions) 我会得到0. 可能是因为分区信息还没有初始化。之后source.discover()source.npartitions更新为1726磁盘上单个文件的数量。

我将如何加载数据:

  • 仅针对给定的一天(例如 20180101)
  • 持续几天(例如 20170601 和 20190223 之间)?

如果这在 wiki 上的某处有所描述,请随时将我指向相应的部分。

注意:在考虑了更多之后,我意识到这可能与 dask 的功能有关,并且我的目标可能可以通过使用方法将源转换为 dask_dataframe 以某种方式实现.to_dask。因此dask在这个问题上贴上标签。

4

2 回答 2

2

至少有两种方法:

  1. 继续使用当前的方法将所有内容加载到 dask 中(使用*),然后将其子集加载到所需范围。

  2. 仅加载数据的特定子集。

对于选项 2,parameters选项intake很方便。因此,假设路径是/some/data/{YYYYMMDD}.parquet,修改后的目录条目将如下所示:

sources:
  source_partitioned:
    parameter:
      date:
        type: str
        default: "*"
    args:
      engine: pyarrow
      urlpath: "/some/data/{{ date }}.parquet"
    description: ''
    driver: intake_parquet.source.ParquetSource

在 Python 中,可以提供参数 date(在本例中为“str”)source = intake.open_catalog('mycat.yaml').source_partitioned(date='20211101')用于加载特定日期。

对于日期范围,事情有点棘手,因为一种方法是使用所需范围创建一些列表理解,然后连接单独加载的文件,但这对于大日期范围可能效率不高。在这些情况下,我会加载更大的块,例如按年份使用date="2017*",然后连接这些更大的块。

于 2021-11-10T05:55:40.057 回答
1

这是对我之前回答的评论的后续行动。

如果 parquet 文件按(非重叠)时间索引,则 dask 不需要将每个文件读入内存(dask 将只读取每个文件的元数据)。将加载所有文件的元数据,但仅将相关文件加载到内存中:

from dask.datasets import timeseries

# this df will have 30 partitions
df = timeseries()

# this query will only work with 1 partition
df.loc["2000-01-03"]

如果下游工作流使用大型数据帧的不同子集运行,这可能很有用,但需要哪些子集是动态更改的。因此,创建大型 dask 数据帧(仅使用元数据)的固定成本会产生一次,然后 dask 负责选择所需数据的子集。

如果 parquet 文件没有按时间索引并且时间信息仅在文件名中,那么 dask 将不会从文件名中解析信息。在这种情况下,一些选项是:

  • 编写一个自定义加载器函数,该函数将过滤所需的文件名并将它们传递给 dask。这可以降低创建 dask 数据帧的固定成本,并且在知道需要整体数据的哪个子集时很有用;

  • intake按照以前的答案使用。

于 2021-11-22T04:53:25.087 回答