0

我想知道当我调用 a 而不是 pd.Dataframe 时如何返回 dask Dataframemap_partitions以避免内存问题。

输入数据框

id  | name   | pet_id
---------------------
1    Charlie  pet_1
2    Max      pet_2
3    Buddy    pet_3
4    Oscar    pet_4

预期输出map_partitions

pet_id | name    |    date    | is_healty
------------------------------------------
pet_1    Charlie   11-20-2018   False
pet_1    Charlie   02-17-2020   True
pet_1    Charlie   04-30-2020   True
pet_2    Max       10-17-2020   True
pet_3    Buddy     01-20-2020   True
pet_3    Buddy     12-12-2020   False
pet_4    Oscar     08-24-2019   True

如果我返回 pd.Dataframe,我已经完成了以下功能并且正在工作。但是,如果我返回一个 dask.dataframe,*** AssertionError则会引发

def get_pets_appointments(df):
    dask_ddf = None
    for k, pet_id in df["pet_id"].iteritems():
        _resp = pets.get_pet_appointments(pet_id) # http POST call
        tmp_df = pd.DataFrame(_resp)
        if dask_ddf is None:
            # First iteration, initialize Dask dataframe
            dask_ddf = dd.from_pandas(tmp_df, npartitions=1)
            continue
        # Work with Dask dataframe in order to avoid Memory Issues
        dask_ddf = dd.concat([dask_ddf, tmp_df])
    # this line works fine
    # return dask_ddf.compute()
    
    # this is raising AssertionError
    return dask_ddf

我正在调用函数如下

pets_app_df = pets_df.map_partitions(get_pets_appointments)
4

1 回答 1

1

简短回答:不(对不起)

的目的map_partitions是作用于 dask 数据帧的每个组成熊猫数据帧。期望是,您将创建一个与原始分区数相同的新数据帧。我认为您想将每个分区拆分为多个分区;你可以.repartition通过事先打电话来做到这一点。

然而,我很惊讶:

dask_ddf = dd.from_pandas(tmp_df, npartitions=1)
...
dask_ddf = dd.concat([dask_ddf, tmp_df])

您在此处提供的两个数据帧都在内存中,那么将它们变成 dask-dataframe 对您有何帮助?重复连接(实际上是附加)不是一个很好的模型。

于 2020-12-01T14:53:26.533 回答