0

我有一个关于 Dask+Parquet 的两部分问题。我正在尝试对从分区 Parquet 文件创建的 dask 数据帧运行查询,如下所示:

import pandas as pd
import dask.dataframe as dd
import fastparquet

##### Generate random data to Simulate Process creating a Parquet file ######

test_df = pd.DataFrame(data=np.random.randn(10000, 2), columns=['data1', 'data2'])
test_df['time'] = pd.bdate_range('1/1/2000', periods=test_df.shape[0], freq='1S')

# some grouping column
test_df['name'] = np.random.choice(['jim', 'bob', 'jamie'], test_df.shape[0])


##### Write to partitioned parquet file, hive and simple #####

fastparquet.write('test_simple.parquet', data=test_df, partition_on=['name'], file_scheme='simple')
fastparquet.write('test_hive.parquet',   data=test_df, partition_on=['name'], file_scheme='hive')

# now check partition sizes. Only Hive version works.
assert test_df.name.nunique() == dd.read_parquet('test_hive.parquet').npartitions  # works.
assert test_df.name.nunique() == dd.read_parquet('test_simple.parquet').npartitions # !!!!FAILS!!!

我的目标是能够使用 dask 并行快速过滤和处理各个分区,如下所示:

df = dd.read_parquet('test_hive.parquet')
df.map_partitions(<something>)   # operate on each partition

我很喜欢使用 Hive 风格的 Parquet 目录,但我注意到与直接从单个 parquet 文件中读取相比,它的操作时间要长得多。

有人可以告诉我实现这一目标的惯用方式吗?对于 Dask/Parquet 来说还是相当新的,所以如果这是一种令人困惑的方法,我们深表歉意。

4

1 回答 1

1

也许从文档字符串中并不清楚,但是“简单”文件类型根本不会发生按值分区,这就是它只有一个分区的原因。

至于速度,当数据非常小时,在一个函数调用中读取数据是最快的 - 特别是如果您打算执行任何操作,例如nunique需要组合来自不同分区的值。

在 Dask 中,每个任务都会产生开销,因此除非调用完成的工作量与开销相比很大,否则您可能会失败。此外,磁盘访问通常不是可并行化的,如果某些计算部分持有 GIL,它们可能无法在线程中并行运行。最后,分区版本包含更多要解析的 parquet 元数据。

>>> len(dd.read_parquet('test_hive.parquet').name.nunique())
12
>>> len(dd.read_parquet('test_simple.parquet').name.nunique())
6

TL;DR:确保您的分区足够大以保持 dask 忙碌。

(注意:唯一值的集合从 parquet 元数据中已经很明显了,根本不需要加载数据;但是 Dask 不知道如何进行这种优化,因为毕竟某些分区可能包含零行)

于 2020-04-07T14:25:02.750 回答