我有许多泡菜文件,一个用于 2005 年到 2010 年之间的每个日期。每个文件都包含一个单词字典,其中包含该日期各自的频率。我还有一个“主文件”,其中包含整个期间的所有唯一单词。总共大约有500万字。
我需要获取所有这些数据并为每个单词生成一个 CSV 文件,每个日期将有一行。例如,例如文件some_word.txt
:
2005-01-01,0.0003
2005-01-02,0.00034
2005-01-03,0.008
我在用 luigi 框架组织这个过程时遇到了麻烦。我当前的顶级任务需要一个单词,查找每个日期的相关频率并将结果存储在 CSV 文件中。我想我可以遍历我的主文件中的每个单词并使用该单词运行任务,但我估计这需要几个月甚至更长的时间。这是我简化版本的顶级AggregateTokenFreqs
任务。
class AggregateTokenFreqs(luigi.Task):
word = luigi.Parameter()
def requires(self):
pass # not sure what to require here, master file?
def output(self):
return luigi.LocalTarget('data/{}.csv'.format(self.word))
def run(self):
results = []
for date_ in some_list_of_dates:
with open('pickles/{}.p'.format(date_), 'rb') as f:
freqs = pickle.load(f)
results.append((date_, freqs.get(self.word))
# Write results list to output CSV file