我试图使用 dask 的并行化机制为 8000 万条英语推文获取 fastText 句子嵌入,如本答案所述: How do you parallelize apply() on Pandas Dataframes using all cores on one machine?
这是我的完整代码:
import dask.dataframe as dd
from dask.multiprocessing import get
import fasttext
import fasttext.util
import pandas as pd
print('starting langage: ' + 'en')
lang_output = pd.DataFrame()
lang_input = full_input.loc[full_input.name == 'en'] # 80 Million English tweets
ddata = dd.from_pandas(lang_input, npartitions = 96)
print('number of lines to compute: ' + str(len(lang_input)))
fasttext.util.download_model('en', if_exists='ignore') # English
ft = fasttext.load_model('cc.'+'en'+'.300.bin')
fasttext.util.reduce_model(ft, 20)
lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')
print('finished en')
这是get_fasttext_sentence_embedding函数:
def get_fasttext_sentence_embedding(row, ft):
if pd.isna(row):
return np.zeros(20)
return ft.get_sentence_vector(row)
但是,我在这一行得到一个酸洗错误:
lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')
这是我得到的错误:
TypeError: can't pickle fasttext_pybind.fasttext objects
有没有办法将 fastText 模型 get_sentence_vector 与 dask (或其他任何东西)并行化?我需要并行化,因为获取 8000 万条推文的句子嵌入需要两个时间,而且我的数据框的一行完全独立于另一行。