6

我最近开始尝试使用多处理来加速任务。我创建了一个执行模糊字符串匹配并使用不同算法计算分数的脚本(我想比较不同的匹配技术)。你可以在这里找到完整的源代码:https ://bitbucket.org/bergonzzi/fuzzy-compare/src 。作为输入,它需要 2 个文件,它们组合成对(file1 的每一行与 file2 的每一行)。对于每一对,计算模糊匹配分数。

我做了3个版本。使用我的 repo 中提供的示例数据(组合成对后由 697.340 个项目组成)运行,我有以下时间:

  • 简单的单一流程 - 0:00:47
  • 使用 Pool.map() 的多进程 - 0:00:13
  • 使用队列的多进程(生产者/消费者模式) - 0:01:04

我试图理解为什么我的 Pool.map() 版本比我的 Queue 版本快得多,这实际上比简单的单进程版本要慢。

我什至尝试使用 Queues 的原因是 Pool.map() 版本会保留结果,直到一切都完成并且最后只写入文件。这意味着对于大文件,它最终会占用大量内存。我说的是这个版本(链接到它,因为这里要粘贴很多代码)。

为了解决这个问题,我将它重构为生产者/消费者模式(或至少尝试过)。在这里,我首先通过组合两个输入文件来生成作业,并将它们放入消费者处理的队列中(计算模糊匹配分数)。完成的作业被放入一个出队列。然后我有一个进程从这个队列中抓取完成的项目并将它们写入文件。这样,理论上,我不需要太多内存,因为结果会被刷新到磁盘。它似乎工作正常,但速度要慢得多。我还注意到,在 Mac OSX 上查看 Activity Monitor 时,我生成的 4 个进程似乎没有使用 100% 的 CPU(Pool.map() 版本不是这种情况)。

我注意到的另一件事是,我的生产者函数似乎正确填满了队列,但消费者进程似乎等到队列填满,而不是在第一个项目到达时立即开始工作。我可能在那里做错了什么......

作为参考,这里有一些队列版本的相关代码(尽管最好查看上面链接的 repo 中的完整代码)。

这是我的生产者功能:

def combine(list1, list2):
    '''
    Combine every item of list1 with every item of list 2,
    normalize put the pair in the job queue.
    '''
    pname = multiprocessing.current_process().name
    for x in list1:
        for y in list2:
            # slugify is a function to normalize the strings
            term1 = slugify(x.strip(), separator=' ')
            term2 = slugify(y.strip(), separator=' ')
            job_queue.put_nowait([term1, term2])

这是编写器功能:

def writer(writer_queue):
    out = open(file_out, 'wb')
    pname = multiprocessing.current_process().name
    out.write(header)
    for match in iter(writer_queue.get, "STOP"):
        print("%s is writing %s") % (pname, str(match))
        line = str(';'.join(match) + '\n')
        out.write(line)
    out.close()

这是执行实际计算的工作函数(删除了大部分代码,因为它在这里没有任何区别,repo 上的完整源代码):

def score_it(job_queue, writer_queue):
    '''Calculate scores for pair of words.'''
    pname = multiprocessing.current_process().name

    for pair in iter(job_queue.get_nowait, "STOP"):
        # do all the calculations and put the result into the writer queue
        writer_queue.put(result)

这就是我设置流程的方式:

# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()

workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []

print('Start matching with "%s", minimum score of %s and %s workers') % (
    args.algorithm, minscore, workers)

# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()

print("Job queue size: %s") % job_queue.qsize()

# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()

for w in xrange(workers):
    p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
    p.start()
    processes.append(p)
    job_queue.put("STOP")

for p in processes:
    p.join()

writer_queue.put("STOP")

我在这里读过很多关于多处理有时会变慢的文章,我知道这与创建和管理新进程的开销有关。此外,当要完成的工作不够“大”时,多处理的效果可能不可见。但是在这种情况下,我认为这项工作相当大,而且 Pool.map() 版本似乎证明了这一点,因为它要快得多。

在管理所有这些进程并传递队列对象时,我做错了什么吗?如何优化这一点,以便在处理结果时将结果写入文件,以最大限度地减少运行时所需的内存量?

谢谢!

4

1 回答 1

2

我认为您的时间问题是您的多线程队列版本缺少优化。您发表的评论基本上是说您的 job_queue 在工作线程开始从中获取作业之前已填满。我相信这是因为你在#Fill up job queue 中的 c.join() 。这可以防止主线程继续运行,直到作业队列已满。我会将 c.join() 移到 p.join() 之后的末尾。您还需要想办法让您的停止标志进入队列末尾。combine 函数可能是放置它的好地方。在用完要组合的数据后添加 x 个停止标志的方法。

另一件需要注意的事情:您在您的 for 循环范围内编写了您的 w 变量,该循环启动了 p 进程。作为样式/可读性/等问题,我会将 w 更改为不同的变量名。如果你不使用它,下划线可以作为一个很好的一次性变量名。IE

for w in xrange(workers):

应该成为

for _ in xrange(workers):

长话短说,如果你将 c.join() 移到最后,你应该得到更准确的时间。目前,唯一的多线程是字符串的模糊匹配。拥有生产者/消费者线程的优点之一是消费者线程不必等到生产者线程完成,因此您最终会使用更少的内存。

于 2014-12-10T16:12:22.873 回答