256

我定期对超过 1500 万行的数据帧执行 pandas 操作,我希望能够访问特定操作的进度指示器。

pandas split-apply-combine 操作是否存在基于文本的进度指示器?

例如,在类似的情况下:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

wherefeature_rollup是一个有些复杂的函数,它采用许多 DF 列并通过各种方法创建新的用户列。对于大型数据帧,这些操作可能需要一段时间,所以我想知道是否可以在 iPython 笔记本中提供基于文本的输出,以更新我的进度。

到目前为止,我已经尝试了 Python 的规范循环进度指示器,但它们并没有以任何有意义的方式与 pandas 交互。

我希望我在 pandas 库/文档中忽略了一些东西,可以让人们知道拆分-应用-组合的进度。apply一个简单的实现可能会查看函数正在运行的数据帧子集的总数,并将进度报告为这些子集的已完成部分。

这可能是需要添加到库中的东西吗?

4

9 回答 9

513

由于大众需求,我在( )中添加了pandas支持。与其他答案不同,这不会显着减慢熊猫的速度——这是一个例子:tqdmpip install "tqdm>=4.9.0"DataFrameGroupBy.progress_apply

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

如果您对它的工作原理感兴趣(以及如何为自己的回调修改它),请参阅GitHub 上的示例、 PyPI上的完整文档,或导入模块并运行help(tqdm)。其他支持的功能包括mapapplymapaggregatetransform

编辑


要直接回答原始问题,请替换:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

和:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

注意: tqdm <= v4.8:对于低于 4.8 的 tqdm 版本,tqdm.pandas()您必须这样做:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
于 2015-12-18T23:36:34.753 回答
19

调整杰夫的答案(并将其作为可重用的功能)。

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

注意:应用进度百分比更新内联。如果你的函数标准输出那么这将不起作用。

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

像往常一样,您可以将其作为一种方法添加到您的 groupby 对象中:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

正如评论中提到的,这不是核心熊猫有兴趣实现的功能。但是 python 允许您为许多 pandas 对象/方法创建这些(这样做将是相当多的工作......尽管您应该能够概括这种方法)。

于 2013-09-04T10:37:32.340 回答
13

对于希望在其自定义并行 pandas-apply 代码上应用 tqdm 的任何人。

(多年来,我尝试了一些用于并行化的库,但我从未找到 100% 并行化解决方案,主要用于 apply 函数,而且我总是不得不回来获取我的“手动”代码。)

df_multi_core - 这是你调用的那个。它接受:

  1. 你的 df 对象
  2. 您要调用的函数名称
  3. 可以对其执行函数的列子集(有助于减少时间/内存)
  4. 并行运行的作业数(-1 或省略所有核心)
  5. df 函数接受的任何其他 kwargs(如“轴”)

_df_split - 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关的”),否则我会在内部找到它。

这是我的gist中的代码(我将在那里添加更多 pandas 函数测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow 是使用 tqdm "progress_apply"进行并行应用的测试代码。

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

在输出中,您可以看到 1 个未并行化运行的进度条,以及并行化运行时的每核进度条。有一点小问题,有时其他核心会同时出现,但即便如此,我认为它很有用,因为您可以获得每个核心的进度统计信息(例如,it/sec 和总记录)

在此处输入图像描述

感谢@abcdaa 提供了这个很棒的图书馆!

于 2019-01-20T11:44:34.817 回答
12

如果您需要支持如何在 Jupyter/ipython 笔记本中使用它,就像我所做的那样,这里有一个有用的指南和相关文章的来源:

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

注意 import 语句中的下划线_tqdm_notebook。正如引用的文章所提到的,开发处于后期测试阶段。

截至 2021 年 11 月 12 日的更新

我目前正在使用pandas==1.3.4and tqdm==4.62.3,我不确定哪个版本的 tqdm 作者实现了此更改,但不推荐使用上面的 import 语句。而是使用:

 from tqdm.notebook import tqdm_notebook

自 2022 年 2 月 1 日起更新 现在可以简化 .py 和 .ipynb 文件的导入语句:

from tqdm.auto import tqdm
tqdm.pandas()

这对于两种类型的开发环境都应该按预期工作,并且应该适用于 pandas 数据帧或其他 tqdm 值得迭代的对象。

于 2018-02-13T19:46:17.963 回答
5

You can easily do this with a decorator

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

then just use the modified_function (and change when you want it to print)

于 2013-09-04T00:26:42.867 回答
4

这里使用的每个答案pandas.DataFrame.groupby。如果你想要一个没有 groupby 的进度条pandas.Series.apply,你可以在 jupyter-notebook 中做到这一点:

from tqdm.notebook import tqdm
tqdm.pandas()


df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)
于 2021-10-01T11:09:06.660 回答
1

我已经更改了Jeff 的答案,包括一个总数,这样您就可以跟踪进度和一个变量来打印每 X 次迭代(如果“print_at”相当高,这实际上会大大提高性能)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

clear_output() 函数来自

from IPython.core.display import clear_output

如果不在 IPython 上,安迪·海登的答案就是没有它

于 2017-11-21T17:40:03.720 回答
0

对于 concat 操作:

df = pd.concat(
    [
        get_data(f)
        for f in tqdm(files, total=len(files))
    ]
)

tqdm 只返回一个可迭代的。

于 2021-11-30T01:56:10.857 回答
0

对于 , 等操作,merge可以使用 Dask 显示进度条。concatjoin

您可以将 Pandas DataFrames 转换为 Dask DataFrames。然后你可以显示 Dask 进度条。

下面的代码显示了简单的示例:

创建和转换 Pandas DataFrame

import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd

n = 450000
maxa = 700

df1 = pd.DataFrame({'lkey': np.random.randint(0, maxa, n),'lvalue': np.random.randint(0,int(1e8),n)})
df2 = pd.DataFrame({'rkey': np.random.randint(0, maxa, n),'rvalue': np.random.randint(0, int(1e8),n)})

sd1 = dd.from_pandas(df1, npartitions=3)
sd2 = dd.from_pandas(df2, npartitions=3)

与进度条合并

from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()

with TqdmCallback(desc="compute"):
    sd1.merge(sd2, left_on='lkey', right_on='rkey').compute()

对于相同的操作,Dask 比 Pandas 更快并且需要更少的资源:

  • 熊猫74.7 ms
  • 达斯克20.2 ms

更多细节:

注1:我已经测试过这个解决方案:https ://stackoverflow.com/a/56257514/3921758但它对我不起作用。不测量合并操作。

注意 2:我已经检查tqdm了 Pandas 的“开放请求”,例如:

于 2021-08-26T10:26:49.027 回答