5

我正在构建一个 python 模块来从大量文本中提取标签,虽然它的结果质量很高,但它的执行速度非常慢。我试图通过使用多处理来加速这个过程,这也很有效,直到我尝试引入一个锁,以便一次只有一个进程连接到我们的数据库。我一生都无法弄清楚如何完成这项工作 - 尽管进行了很多搜索和调整,但我仍然得到一个PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed. 这是有问题的代码 - 在我尝试将锁定对象作为f.

def make_network(initial_tag, max_tags = 2, max_iter = 3):
    manager = Manager()
    lock = manager.Lock()
    pool = manager.Pool(8)

    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a list
    # of strings (tags) as its sole argument, and returns a list of sets with entries
    # corresponding to the input list.
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level):
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print i + "|" + joined
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level+1)
        except StopIteration:
            return None

    _recursively_find_more_tags([initial_tag], 0)
4

1 回答 1

7

您的问题是锁定对象不可腌制。在这种情况下,我可以为您看到两种可能的解决方案。

  • 为避免这种情况,您可以将锁变量设为全局变量。然后,您将能够在池进程函数中直接将其作为全局变量引用,而不必将其作为参数传递给池进程函数。这是因为 PythonOS fork在创建池进程时使用该机制,因此将创建池进程的进程的全部内容复制到它们。这是将锁传递给使用 multiprocessing 包创建的 Python 进程的唯一方法。顺便说一下,没有必要Manager只为这个锁使用这个类。通过此更改,您的代码将如下所示:

    import multiprocessing
    from functools import partial
    
    lock = None  # Global definition of lock
    pool = None  # Global definition of pool
    
    
    def make_network(initial_tag, max_tags=2, max_iter=3):
        global lock
        global pool
        lock = multiprocessing.Lock()
        pool = multiprocessing.Pool(8)
    
    
    def get_more_tags():
        global lock
        pass
    
    
    # this is a very expensive function that I would like to parallelize
    # over a list of tags. It involves a (relatively cheap) call to an external
    # database, which needs a lock to avoid simultaneous queries. It takes a
    # list of strings (tags) as its sole argument, and returns a list of sets
    # with entries corresponding to the input list.
    f = partial(get_more_tags, max_tags=max_tags) 
    
    def _recursively_find_more_tags(tags, level):
        global pool
        if level >= max_iter:
            raise StopIteration
        new_tags = pool.map(f, tags)
        to_search = []
        for i, s in zip(tags, new_tags):
            for t in s:
                joined = ' '.join(t)
                print(i + "|" + joined)
                to_search.append(joined)
        try:
            return _recursively_find_more_tags(to_search, level + 1)
        except StopIteration:
            return None
    
    _recursively_find_more_tags([initial_tag], 0)
    

在您的真实代码中,锁和池变量可能是类实例变量。

  • 第二种完全避免使用锁但开销可能稍高的解决方案是创建另一个进程multiprocessing.Process并通过 a 将其连接multiprocessing.Queue到每个池进程。此过程将负责运行您的数据库查询。您将使用队列来允许池进程将参数发送到管理数据库查询的进程。由于所有池进程将使用相同的队列,因此对数据库的访问将自动序列化。额外的开销将来自数据库查询参数和查询响应的酸洗/解酸。请注意,您可以将multiprocessing.Queue对象作为参数传递给池进程。另请注意,multiprocessing.Lock基于的解决方案不适Windows用于未创建流程的地方fork语义。
于 2013-07-31T09:36:38.780 回答