2

我有一个用例,我想通过 to_parquet(ddf, 'TestParquet', append=True) 将多个 Dask 数据帧存储到一个公共镶木地板存储中。

parquet 文件的结构是通过写入的第一个数据帧设置的(没有 append=True)。

每个数据框都有分类索引列。

类别从一开始就在所有数据帧中都是已知的,并且没有数据帧具有共同的类别。

数据框在整个类别列表上进行分区(因此每个类别在保存到镶木地板之前都有空分区)。最终,一旦所有 daframe 都保存到 parquet 中,所有类别/分区都将包含数据。

问题:附加第二个数据框后,索引不能用于检索特定类别。

这是一个最小的可重现示例:

熊猫:0.24.2 fastparquet:0.4.1 dask:2.22.0

import pandas as pd
import dask.dataframe as dd
import fastparquet

data= {'Name': [ 'C', 'B','F', 'B', 'F', 'B'], 'ID':[1, 1, 2, 2, 3, 3], 'Value':[2,3,4,1,2,3]}
df = pd.DataFrame(data)
df['Name']=df['Name'].astype(pd.api.types.CategoricalDtype(categories=['A', 'B', 'C', 'D', 'E', 'F'], ordered=True) )
ddf = dd.from_pandas(df, npartitions=2)
ddf=ddf.set_index('Name', sorted=False).repartition(divisions=['A', 'B', 'C', 'D', 'E', 'F', 'F'], force=True)
ddf_parquet=dd.read_parquet('./TestParquet')

data2= {'Name': ['D', 'E', 'A', 'A','D', 'A'], 'ID':[1, 1, 2, 3, 3, 4], 'Value':[1,2,3, 4,5,6]}
df2 = pd.DataFrame(data2)
df2['Name']=df2['Name'].astype(pd.api.types.CategoricalDtype(categories=['A', 'B', 'C', 'D', 'E', 'F'], ordered=True) )
ddf2 = dd.from_pandas(df2, npartitions=2)
ddf2=ddf2.set_index('Name', sorted=False).repartition(divisions=['A', 'B', 'C', 'D', 'E', 'F', 'F'], force=True)
dd.to_parquet(ddf2, './TestParquet', engine='fastparquet', append=True, ignore_divisions=True)
ddf_parquet2=dd.read_parquet('./TestParquet')

将第一个 Dataframe 保存到 parquet 后,我​​可以毫无问题地使用索引:

ddf_parquet.loc['B'].head()

      ID  Value
Name
F      3      2
F      2      4

但是,在附加第二个数据帧之后,尝试选择除第一个分区 ('B') 的索引值之外的任何内容都会导致错误:

ddf_parquet2.loc['A'].head()


Traceback (most recent call last):
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4805, in get_slice_bound
    return self._searchsorted_monotonic(label, side)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4756, in _searchsorted_monotonic
    return self.searchsorted(label, side=side)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/base.py", line 1501, in searchsorted
    return self._values.searchsorted(value, side=side, sorter=sorter)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/arrays/categorical.py", line 1370, in searchsorted
    raise ValueError("Categorical not ordered\nyou can use "
ValueError: Categorical not ordered
you can use .as_ordered() to change the Categorical to an ordered one

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/dask/dataframe/methods.py", line 42, in try_loc
    return loc(df, iindexer, cindexer)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/dask/dataframe/methods.py", line 28, in loc
    return df.loc[iindexer]
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexing.py", line 1500, in __getitem__
    return self._getitem_axis(maybe_callable, axis=axis)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexing.py", line 1867, in _getitem_axis
    return self._get_slice_axis(key, axis=axis)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexing.py", line 1533, in _get_slice_axis
    slice_obj.step, kind=self.name)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4673, in slice_indexer
    kind=kind)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4872, in slice_locs
    start_slice = self.get_slice_bound(start, 'left', kind)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4808, in get_slice_bound
    raise err
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4802, in get_slice_bound
    slc = self._get_loc_only_exact_matches(label)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4772, in _get_loc_only_exact_matches
    return self.get_loc(key)
  File "/cba/local/mx/Linux_x86_64/python/Python-3.6.2/lib/python3.6/site-packages/pandas/core/indexes/category.py", line 438, in get_loc
    raise KeyError(key)
KeyError: 'A'

我不知道为什么在索引中找不到“A”键以及为什么错误建议对分类进行排序,因为在设置索引之前对“名称”类别进行了排序。

一项观察:

  • 而两个数据帧的划分按预期设置为:

    ('A','B','C','D','E','F','F')

保存到 Parquet 并读回数据后,在保存第一个 Dataframe 后,这些分区将重新对齐到第一个 Dataframe 的非空分区: 

('B', 'C', 'F', 'F')

因此,我必须使用 ignore_partitions=True 附加第二个 Daframe 否则我收到一个错误,指出分区重叠(这正是我在保存之前对整个列表类别重新分区以尝试从第一个数据帧)。

重置索引和重新索引实际上并不可行,因为实际数据集非常庞大(在保存到 Parquet 之前总共大约 200GB)。

设置每个数据帧的划分以匹配其实际类别,最终会导致相同的错误。

任何帮助将非常感激。

4

0 回答 0