1

我正在尝试过滤 Dask DataFrame,然后使用map_partitions将函数应用于每个分区。DataFrame该函数需要一个至少有 1 行的 pandas 。

这是为 MCVE 生成一些虚拟数据作为pandas DataFrame(然后转换为 Dask DataFrame)的代码

def create_data(n):
    df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
    random_integers = np.random.default_rng().choice(14, size=n, replace=False)
    df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
    return df

df = create_data(n=10)
print(df.head(15))
>>>
    store_id         A
0         10  0.850730
1         10  0.581119
2         10  0.825802
3         10  0.657797
4         10  0.291961
5         10  0.864984
6          9  0.161334
7          9  0.397162
8          9  0.089300
9          9  0.435914
10         9  0.750741
11         9  0.920625
12         3  0.635727
13         3  0.425270
14         3  0.904043

数据结构:对于每个store_id,正好有 6 行。

现在我创建了一些store_id我想用来过滤上述数据的 s列表

filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
print(filtered_store_ids)
>>> [13, 12, 11, 10, 9, 7]

然后我将上述数据(熊猫DataFrame)转换为dask.dataframe

ddf = dd.from_pandas(df, npartitions=10)

现在我打印的分区ddf

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=6
Partition Index=9, Number of Rows=6

这是意料之中的。每个分区有 6 行和一个 (unique) store_id。因此,每个分区都包含单个store_id.

store_id我现在使用上面的 s列表过滤 Dask 数据框

ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]

我再次打印过滤后的分区ddf

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=0
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=0
Partition Index=5, Number of Rows=6
Partition Index=6, Number of Rows=6
Partition Index=7, Number of Rows=6
Partition Index=8, Number of Rows=0
Partition Index=9, Number of Rows=6

这是意料之中的,因为每个分区都有一个store_id,并且通过过滤,一些分区将被完全过滤掉,因此它们将包含零行。

Dataframe所以,现在我将根据Dask DataFrame 最佳实践重新分区过滤

ddf = ddf.repartition(npartitions=len(filtered_store_ids))
print(ddf)
>>>
Dask DataFrame Structure:
              store_id        A
npartitions=6                  
0                int64  float64
6                  ...      ...
...                ...      ...
48                 ...      ...
59                 ...      ...
Dask Name: repartition, 47 tasks

我预计这种重新分区操作只会产生大小均匀的非空分区。但是,现在当我重新打印分区时,我得到了与前一个类似的输出(不均匀的分区大小和一些空分区),就好像重新分区没有发生一样

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=0
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=12
Partition Index=5, Number of Rows=6

我的下一步是在过滤后对每个分区应用一个函数,但这不起作用,因为有一些分区(pandas DataFrame)由于缺少行而函数无法处理。

def myadd(df):
    assert df.shape[0] > 0
    ...
    return ...

ddf.map_partitions(myadd)
>>> AssertionError                            Traceback (most recent call last)
.
.
.
AssertionError: 

重新分区的 Dask 文档解释得很好(与我上面链接的最佳实践相同),它看起来很简单,但是在重新分区之后,我仍然得到一些零行的分区,并且map_partitions会在这里失败。我确定我在这里遗漏了一些东西。

有几篇关于重新分区(12)的 SO 帖子,但它们不处理空分区。

问题

有没有办法确保在重新分区后,所有分区将再次有 6 行并且没有空分区?即是否有可能有一个重新分区的 DaskDataFrame具有相同大小(非空)的分区?

编辑

目前,Dask 似乎无法处理空分区:问题12。这些可能与我在这里遇到的问题有关。

4

1 回答 1

3

我从 SO 中找到了两个现有帖子

我用它们来解决这个问题。

从问题的原始代码开始(无需更改)

.
<identical code from question here>
.
ddf = ddf.repartition(npartitions=len(filtered_store_ids))

接下来我只是在repartitioned上依次调用这两个函数ddf

ddf = cull_empty_partitions(ddf)  # remove empties
ddf = _rebalance_ddf(ddf)         # re-size

当我现在重新打印分区大小时,所有分区的大小都是均匀的,没有一个是空的

for p in range(ddf.npartitions):
    print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
>>>
Partition Index=0, Number of Rows=6
Partition Index=1, Number of Rows=6
Partition Index=2, Number of Rows=6
Partition Index=3, Number of Rows=6
Partition Index=4, Number of Rows=6
Partition Index=5, Number of Rows=6
于 2020-05-09T03:44:34.793 回答