1

我试图使用 dask 的并行化机制为 8000 万条英语推文获取 fastText 句子嵌入,如本答案所述: How do you parallelize apply() on Pandas Dataframes using all cores on one machine?

这是我的完整代码:

import dask.dataframe as dd
from dask.multiprocessing import get
import fasttext
import fasttext.util
import pandas as pd

print('starting langage: ' + 'en')
lang_output = pd.DataFrame()
lang_input = full_input.loc[full_input.name == 'en'] # 80 Million English tweets
ddata = dd.from_pandas(lang_input, npartitions = 96)
print('number of lines to compute: ' + str(len(lang_input)))
fasttext.util.download_model('en', if_exists='ignore')  # English
ft = fasttext.load_model('cc.'+'en'+'.300.bin')
fasttext.util.reduce_model(ft, 20)
lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')
print('finished en')

这是get_fasttext_sentence_embedding函数:

def get_fasttext_sentence_embedding(row, ft):
    if pd.isna(row):
        return np.zeros(20)
    return ft.get_sentence_vector(row)

但是,我在这一行得到一个酸洗错误:

lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')

这是我得到的错误:

TypeError: can't pickle fasttext_pybind.fasttext objects

有没有办法将 fastText 模型 get_sentence_vector 与 dask (或其他任何东西)并行化?我需要并行化,因为获取 8000 万条推文的句子嵌入需要两个时间,而且我的数据框的一行完全独立于另一行。

4

2 回答 2

1

我遇到了同样的问题,但我找到了使用 Multiprocessing - Python 的标准库的解决方案。

第一步 - 包装

model = fasttext.load_model(file_name_model)

def get_vec(txt):
    ''' 
    First tried to put model.get_sentence_vector into map (look below), but it resulted in pickle error.
    This works, lol.
    '''
    return model.get_sentence_vector(txt)

然后,我这样做:

from multiprocessing import Pool

text = ["How to sell drugs (fast)", "House of Cards", "The Crown"]

with Pool(40) as p: # I have 40 cores
    result = p.map(get_vec, text)

用 40 个内核处理 1000 万条短文本花了我大约 80 秒。

于 2021-09-02T08:44:36.987 回答
1

这里的问题是fasttext对象显然不能被pickle,而Dask不知道如何在不进行pickle的情况下序列化和反序列化这个数据结构。

在这里使用 Dask 的最简单方法(但可能不是最有效的)是让每个进程定义ft模型本身,这将避免传输它的需要(从而避免尝试酸洗)。像下面这样的东西会起作用。请注意,这ft是在跨分区映射的函数内部定义的。

首先,一些示例数据。

import dask.dataframe as dd
import fasttext
import pandas as pd
import dask
import numpy as np

df = pd.DataFrame({"text":['this is a test sentence', None, 'this is another one.', 'one more']})
ddf = dd.from_pandas(df, npartitions=2)
ddf

Dask DataFrame Structure:
text
npartitions=2   
0   object
2   ...
3   ...
Dask Name: from_pandas, 2 tasks

接下来,我们可以调整您的功能以ft在每个流程中定义。这重复了工作,但避免了转移模型的需要。有了它,我们就可以通过map_partitions.

def get_embeddings(sent, model):
    return model.get_sentence_vector(sent) if not pd.isna(sent) else np.zeros(10)

def func(df):
    ft = fasttext.load_model("amazon_review_polarity.bin") # arbitrary model
    res = df['text'].apply(lambda x: get_embeddings(x, model=ft))
    return res

ddf['sentence_vector'] = ddf.map_partitions(func)
ddf.compute(scheduler='processes')

text    sentence_vector
0   this is a test sentence [-0.01934033, 0.03729743, -0.04679677, -0.0603...
1   None    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
2   this is another one.    [-0.0025579212, 0.0353713, -0.027139299, -0.05...
3   one more    [-0.014522496, 0.10396308, -0.13107553, -0.198...

请注意,这种嵌套数据结构(列中的列表)可能不是处理这些向量的最佳方式,但这取决于您的用例。此外,可能有一种方法可以使用fastext而不是一次一行(在 Python 中)分批进行此计算,但我并不精通fastext.

于 2020-04-26T04:30:02.273 回答