我有一个关于 Dask+Parquet 的两部分问题。我正在尝试对从分区 Parquet 文件创建的 dask 数据帧运行查询,如下所示:
import pandas as pd
import dask.dataframe as dd
import fastparquet
##### Generate random data to Simulate Process creating a Parquet file ######
test_df = pd.DataFrame(data=np.random.randn(10000, 2), columns=['data1', 'data2'])
test_df['time'] = pd.bdate_range('1/1/2000', periods=test_df.shape[0], freq='1S')
# some grouping column
test_df['name'] = np.random.choice(['jim', 'bob', 'jamie'], test_df.shape[0])
##### Write to partitioned parquet file, hive and simple #####
fastparquet.write('test_simple.parquet', data=test_df, partition_on=['name'], file_scheme='simple')
fastparquet.write('test_hive.parquet', data=test_df, partition_on=['name'], file_scheme='hive')
# now check partition sizes. Only Hive version works.
assert test_df.name.nunique() == dd.read_parquet('test_hive.parquet').npartitions # works.
assert test_df.name.nunique() == dd.read_parquet('test_simple.parquet').npartitions # !!!!FAILS!!!
我的目标是能够使用 dask 并行快速过滤和处理各个分区,如下所示:
df = dd.read_parquet('test_hive.parquet')
df.map_partitions(<something>) # operate on each partition
我很喜欢使用 Hive 风格的 Parquet 目录,但我注意到与直接从单个 parquet 文件中读取相比,它的操作时间要长得多。
有人可以告诉我实现这一目标的惯用方式吗?对于 Dask/Parquet 来说还是相当新的,所以如果这是一种令人困惑的方法,我们深表歉意。