0

我有一个非常大的熊猫数据框,如下所示:

╔════════╦═════════════╦════════╗
║ index  ║    Users    ║ Income ║
╠════════╬═════════════╬════════╣
║ 0      ║ user_1      ║    304 ║
║ 1      ║ user_2      ║    299 ║
║ ...    ║             ║        ║
║ 399999 ║ user_400000 ║    542 ║
╚════════╩═════════════╩════════╝

(有几列需要做一些计算)

因此,对于每个客户,我必须应用大量的操作(移位、求和、减法、条件等),因此(我认为)不可能对所有内容应用布尔掩码,我已经尝试过,所以我的问题是可以将 pandas 数据框分成如下块,例如:

# chunk 1
╔════════╦═════════════╦════════╗
║ index  ║    Users    ║ Income ║
╠════════╬═════════════╬════════╣
║ 0      ║ user_1      ║    304 ║
║ 1      ║ user_2      ║    299 ║
║ ...    ║             ║        ║
║ 19999  ║ user_20000  ║    432 ║
╚════════╩═════════════╩════════╝

# chunk 2
╔════════╦═════════════╦════════╗
║ index  ║    Users    ║ Income ║
╠════════╬═════════════╬════════╣
║ 20000  ║ user_20000  ║    199 ║
║ 20001  ║ user_20001  ║    412 ║
║ ...    ║             ║        ║
║ 39999  ║ user_40000  ║    725 ║
╚════════╩═════════════╩════════╝

# chunk K 
╔════════╦═════════════╦════════╗
║ index  ║    Users    ║ Income ║
╠════════╬═════════════╬════════╣
║ ...    ║ user_...    ║    ... ║
║ ...    ║ user_...    ║    ... ║
║ ...    ║             ║        ║
║ ...    ║ user_...    ║    ... ║
╚════════╩═════════════╩════════╝

并同时应用所有这些块的所有操作。

4

2 回答 2

2

您可以使用多处理池来完成其中一些任务,但是,多处理也是一项昂贵的操作,因此您需要测试并行化它是否实际上更快,这取决于您正在运行的函数的类型和数据,例如我创建了一个示例df

import pandas as pd
import numpy as np
from random import randint
from multiprocessing import Pool, cpu_count
from timeit import timeit


def f(df: pd.DataFrame):
    df['Something'] = df['Users'].apply(lambda name: len(name))
    df['Other stuff'] = df['Income'].apply(lambda income: 'Senior' if income > 200 else 'Junior')
    df['Some other stuff'] = df['Users'].apply(lambda name:  name.count('1'))
    return df


if __name__ == '__main__':
    samples = 5000000
    df = pd.DataFrame(
        [
            ['user_' + str(i), randint(0, 500)] for i in range(1, samples)
        ], columns=['Users', 'Income']
    )

如果我们使用多处理来计时这个版本的f功能,我会38.189200899999996使用我的旧笔记本电脑:

    parallelized = timeit("""
cores = cpu_count()
df_in_chunks = np.array_split(df, cores)
pool = Pool(cores)
result_df = pd.concat(pool.map(f, df_in_chunks))
pool.close()
pool.join()
    """, 
    "from __main__ import pd, np, df, Pool, cpu_count, f", 
    number=5
    )
    print(parallelized)

在这种情况下,我得到25.0754394,因此使用多处理的开销大于在单个内核中运行整个事物的执行时间。

    not_parallelized = timeit("""
result_df = f(df)
    """, 
    "from __main__ import pd, df, f", 
    number=5
    )
    print(not_parallelized)

但是,如果我们为函数添加更多复杂性,f那么在某个点上,df向每个进程广播比在单个内核中运行它更便宜。

于 2020-03-15T19:45:09.823 回答
0

据我所知,熊猫GroupBy:-Split,-Apply,-Combine可能会解决您的问题。将你的DataFrame分成多个块(组),然后对每个块(组)应用一个自定义函数。如果您有任何进一步的问题,我们可以讨论代码。

希望这可以帮助!

于 2020-03-15T19:12:11.367 回答