0

当我在 mp 库启动的每个处理器中对分块 Pandas 数据帧使用多处理时,我得到一个未找到表的错误。
我以下列方式将 pandasql 库用于 SQL:

import pandasql import sqldf
pysqldf = lambda q: sqldf(q, globals())

df = pd.DataFrame({'a': [1,2,4,3,6,1,2], 'b': ['a','a','b','b','c','c','c']})

这适用于单线程:

sorted_df = pysqldf("select * from df order by b, a")

当我应用多处理并行处理每个 df 块时,它不起作用:

def parallelize_dataframe(df, func):
    unique_bs = df.b.unique().tolist()
    df_split = [df[df.a == l] for l in unique_bs]
    df = pd.concat(pool.map(func, df_split))
    pool = Pool(num_cores)
    pool.close()
    pool.join()
    return df

def sort_chunks(data):
    sorted_data = pysqldf("select * from data order by b, a")
    return sorted_data

sorted_df = parallelize_dataframe(df, sort_chunks)

我得到的错误如下:

PandaSQLException:(sqlite3.OperationalError)没有这样的表:数据[SQL:'select * from data'](此错误的背景: http ://sqlalche.me/e/e3q8 )

我明白错误告诉我什么。基本上,在数据库中找不到每个处理器中的数据 DF。我不确定这个问题的解决方法是什么。任何输入将不胜感激。谢谢你。

4

1 回答 1

0

我想到了。问题是命名空间。本质上,pysqldf 接受两个参数——查询和命名空间,它们是全局的或本地的。如果 sqldf 函数被包装在一个函数中并且需要使用该函数的本地数据框,则命名空间参数应该是本地的。

将排序数据功能修改为以下解决了该问题:

def sort_chunks(data):
    sorted_data = pysqldf("select * from data order by b, a", locals())
    return sorted_data
于 2018-07-04T18:06:38.033 回答