我最近开始尝试使用多处理来加速任务。我创建了一个执行模糊字符串匹配并使用不同算法计算分数的脚本(我想比较不同的匹配技术)。你可以在这里找到完整的源代码:https ://bitbucket.org/bergonzzi/fuzzy-compare/src 。作为输入,它需要 2 个文件,它们组合成对(file1 的每一行与 file2 的每一行)。对于每一对,计算模糊匹配分数。
我做了3个版本。使用我的 repo 中提供的示例数据(组合成对后由 697.340 个项目组成)运行,我有以下时间:
- 简单的单一流程 - 0:00:47
- 使用 Pool.map() 的多进程 - 0:00:13
- 使用队列的多进程(生产者/消费者模式) - 0:01:04
我试图理解为什么我的 Pool.map() 版本比我的 Queue 版本快得多,这实际上比简单的单进程版本要慢。
我什至尝试使用 Queues 的原因是 Pool.map() 版本会保留结果,直到一切都完成并且最后只写入文件。这意味着对于大文件,它最终会占用大量内存。我说的是这个版本(链接到它,因为这里要粘贴很多代码)。
为了解决这个问题,我将它重构为生产者/消费者模式(或至少尝试过)。在这里,我首先通过组合两个输入文件来生成作业,并将它们放入消费者处理的队列中(计算模糊匹配分数)。完成的作业被放入一个出队列。然后我有一个进程从这个队列中抓取完成的项目并将它们写入文件。这样,理论上,我不需要太多内存,因为结果会被刷新到磁盘。它似乎工作正常,但速度要慢得多。我还注意到,在 Mac OSX 上查看 Activity Monitor 时,我生成的 4 个进程似乎没有使用 100% 的 CPU(Pool.map() 版本不是这种情况)。
我注意到的另一件事是,我的生产者函数似乎正确填满了队列,但消费者进程似乎等到队列填满,而不是在第一个项目到达时立即开始工作。我可能在那里做错了什么......
作为参考,这里有一些队列版本的相关代码(尽管最好查看上面链接的 repo 中的完整代码)。
这是我的生产者功能:
def combine(list1, list2):
'''
Combine every item of list1 with every item of list 2,
normalize put the pair in the job queue.
'''
pname = multiprocessing.current_process().name
for x in list1:
for y in list2:
# slugify is a function to normalize the strings
term1 = slugify(x.strip(), separator=' ')
term2 = slugify(y.strip(), separator=' ')
job_queue.put_nowait([term1, term2])
这是编写器功能:
def writer(writer_queue):
out = open(file_out, 'wb')
pname = multiprocessing.current_process().name
out.write(header)
for match in iter(writer_queue.get, "STOP"):
print("%s is writing %s") % (pname, str(match))
line = str(';'.join(match) + '\n')
out.write(line)
out.close()
这是执行实际计算的工作函数(删除了大部分代码,因为它在这里没有任何区别,repo 上的完整源代码):
def score_it(job_queue, writer_queue):
'''Calculate scores for pair of words.'''
pname = multiprocessing.current_process().name
for pair in iter(job_queue.get_nowait, "STOP"):
# do all the calculations and put the result into the writer queue
writer_queue.put(result)
这就是我设置流程的方式:
# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()
workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []
print('Start matching with "%s", minimum score of %s and %s workers') % (
args.algorithm, minscore, workers)
# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()
print("Job queue size: %s") % job_queue.qsize()
# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()
for w in xrange(workers):
p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
p.start()
processes.append(p)
job_queue.put("STOP")
for p in processes:
p.join()
writer_queue.put("STOP")
我在这里读过很多关于多处理有时会变慢的文章,我知道这与创建和管理新进程的开销有关。此外,当要完成的工作不够“大”时,多处理的效果可能不可见。但是在这种情况下,我认为这项工作相当大,而且 Pool.map() 版本似乎证明了这一点,因为它要快得多。
在管理所有这些进程并传递队列对象时,我做错了什么吗?如何优化这一点,以便在处理结果时将结果写入文件,以最大限度地减少运行时所需的内存量?
谢谢!