5

我有一个模糊字符串匹配脚本,它在 400 万个公司名称的大海捞针中寻找大约 30K 根针。虽然脚本运行良好,但由于内存不足,我尝试通过 AWS h1.xlarge 上的并行处理来加快速度的尝试失败了。

我不想像在回答我之前的问题时解释的那样尝试获得更多内存,而是想了解如何优化工作流程 - 我对此很陌生,所以应该有足够的空间。顺便说一句,我已经尝试过队列(也工作过,但遇到了同样的MemoryError问题,另外还查看了一堆非常有用的 SO 贡献,但还没有完全实现。

这似乎与代码最相关。我希望它充分阐明了逻辑-很高兴根据需要提供更多信息:

def getHayStack():
    ## loads a few million company names into id: name dict
    return hayCompanies

def getNeedles(*args):
    ## loads subset of 30K companies into id: name dict (for allocation to workers)
    return needleCompanies

def findNeedle(needle, haystack):
    """ Identify best match and return results with score """
    results = {}
    for hayID, hayCompany in haystack.iteritems():
        if not isnull(haystack[hayID]):
            results[hayID] = levi.setratio(needle.split(' '), 
                                           hayCompany.split(' '))
    scores = list(results.values())
    resultIDs = list(results.keys())
    needleID = resultIDs[scores.index(max(scores))]
    return [needleID, haystack[needleID], max(scores)]

def runMatch(args):
    """ Execute findNeedle and process results for poolWorker batch"""
    batch, first = args
    last = first + batch
    hayCompanies = getHayStack()
    needleCompanies = getTargets(first, last)
    needles = defaultdict(list)
    current = first
    for needleID, needleCompany in needleCompanies.iteritems():
        current += 1
        needles[targetID] = findNeedle(needleCompany, hayCompanies)
    ## Then store results

if __name__ == '__main__':
    pool = Pool(processes = numProcesses)
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    pool.map_async(runMatch, 
                   itertools.izip(itertools.repeat(targetsPerBatch),
                                  xrange(0, 
                                         totalTargets,
                                         targetsPerBatch))).get(99999999)
    pool.close()
    pool.join()

所以我想问题是:我怎样才能避免为所有工人加载干草堆 - 例如通过共享数据或采取不同的方法,比如将更大的干草堆分配给工人而不是针?我怎样才能通过避免或消除混乱来提高内存使用率?

4

1 回答 1

4

你的设计有点混乱。您正在使用 N 个工作人员池,然后将您的 M 个工作分解为 N 个大小为 M/N 的任务。换句话说,如果你得到了所有正确的,你正在模拟在工作进程之上构建的池之上的工作进程。为什么要为此烦恼?如果要使用进程,直接使用即可。或者,将池用作池,将每个作业作为其自己的任务发送,并使用批处理功能以某种适当(且可调整)的方式对它们进行批处理。

这意味着runMatch只需要一个 needleID 和 needleCompany,它所做的只是调用findNeedle然后执行该# Then store results部分的任何操作。然后主程序变得简单得多:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        results = pool.map_async(runMatch, needleCompanies.iteritems(), 
                                 chunkSize=NUMBER_TWEAKED_IN_TESTING).get()

或者,如果结果很小,而不是让所有进程(大概)争夺一些共享的结果存储事物,只需返回它们。然后你根本不需要runMatch,只需要:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        for result in pool.imap_unordered(findNeedle, needleCompanies.iteritems(), 
                                          chunkSize=NUMBER_TWEAKED_IN_TESTING):
            # Store result

或者,或者,如果您确实想做 N 个批次,只需为每个批次创建一个 Process:

if __name__ == '__main__':
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    processes = [Process(target=runMatch, 
                         args=(targetsPerBatch,
                               xrange(0, 
                                      totalTargets,
                                      targetsPerBatch))) 
                 for _ in range(numProcesses)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

此外,您似乎getHayStack()为每个任务调用一次(getNeedles以及)。我不确定同时获得多个实时副本有多容易,但考虑到它是迄今为止您拥有的最大数据结构,这将是我尝试排除的第一件事。事实上,即使它不是内存使用问题,getHayStack也很容易对性能造成很大影响,除非您已经在进行某种缓存(例如,第一次将其显式存储在全局或可变默认参数值中,然后只是使用它),所以无论如何它可能值得修复。

一次解决这两个潜在问题的一种方法是在Pool构造函数中使用初始化程序:

def initPool():
    global _haystack
    _haystack = getHayStack()

def runMatch(args):
    global _haystack
    # ...
    hayCompanies = _haystack
    # ...

if __name__ == '__main__':
    pool = Pool(processes=numProcesses, initializer=initPool)
    # ...

接下来,我注意到您在实际上不需要它们的多个位置显式生成列表。例如:

scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID, haystack[needleID], max(scores)]

如果有多个结果,这是浪费;直接使用results.values()iterable即可。(事实上​​,看起来您使用的是 Python 2.x,在这种情况下keys,并且已经values是列表,所以您只是无缘无故地制作了一个额外的副本。)

但在这种情况下,您可以进一步简化整个事情。您只是在寻找得分最高的键(resultID)和值(score),对吗?所以:

needleID, score = max(results.items(), key=operator.itemgetter(1))
return [needleID, haystack[needleID], score]

这也消除了所有重复搜索score,这应该会节省一些 CPU。


这可能不会直接解决内存问题,但它应该有望使调试和/或调整更容易。

尝试的第一件事就是使用小得多的批次——而不是 input_size/cpu_count,尝试 1. 内存使用量是否下降?如果不是,我们已经排除了这部分。

接下来,试着sys.getsizeof(_haystack)看看它说了什么。如果它是,比如说,1.6GB,那么你把所有东西都压缩到 0.4GB 是非常好的,所以这就是攻击它的方法——例如,使用shelve数据库而不是普通的dict.

还可以尝试在初始化函数的开始和结束处转储内存使用量(使用resource模块, )。getrusage(RUSAGE_SELF)如果最后的干草堆只有 0.3GB,但你又分配了 1.3GB 来构建它,那就是要攻击的问题。例如,您可能会分离一个子进程来构建和腌制字典,然后让池初始化程序打开它并取消腌制它。或者将两者结合起来——在第一个孩子中构建一个shelve数据库,并在初始化程序中以只读方式打开它。无论哪种方式,这也意味着您只进行一次 CSV 解析/字典构建工作,而不是 8 次。

另一方面,如果您的总 VM 使用率仍然很低(请注意,getrusage无法直接查看您的总 VM 大小—<code>ru_maxrss 通常是一个有用的近似值,特别是如果ru_nswap为 0)在第一个任务时运行,问题出在任务本身。

首先,getsizeof任务函数的参数和您返回的值。如果它们很大,特别是如果它们在每个任务中不断变大或变化很大,则可能只是酸洗和解酸数据占用过多内存,最终它们中的 8 个加在一起大到足以达到极限。

否则,问题很可能出在任务功能本身。要么您有内存泄漏(您只能通过使用有缺陷的 C 扩展模块或 来实现真正的ctypes泄漏,但是如果您在调用之间保留任何引用,例如,在全局中,您可能只是不必要地永远持有事物),或者某些任务本身占用了太多内存。无论哪种方式,这应该是您可以通过拉出多处理并直接运行任务来更轻松地测试的东西,这更容易调试。

于 2013-09-10T00:29:08.343 回答