78

I'm trying to use the multiprocess Pool object. I'd like each process to open a database connection when it starts, then use that connection to process the data that is passed in. (Rather than opening and closing the connection for each bit of data.) This seems like what the initializer is for, but I can't wrap my head around how the worker and the initializer communicate. So I have something like this:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

how do I (or do I) get the cursor back from get_cursor() into the process_data()?

4

5 回答 5

109

初始化函数是这样调用的:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)

所以在任何地方都没有保存返回值。你可能认为这注定了你的命运,但不是!每个工人都在一个单独的进程中。因此,您可以使用普通global变量。

这并不完全漂亮,但它有效:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...

现在你可以cursor在你的process_data函数中使用了。每个单独进程内的cursor变量与所有其他进程是分开的,因此它们不会相互踩踏。

(我不知道是否psycopg2有不同的方法来处理这个不涉及multiprocessing首先使用的方法;这是对multiprocessing模块一般问题的一般回答。)

于 2012-04-12T06:10:29.700 回答
29

您还可以将函数发送到初始化程序并在其中创建连接。然后将光标添加到函数中。

def init_worker(function):
    function.cursor = db.conn()

现在您可以通过 function.cursor 访问数据库,而无需使用全局变量,例如:

def use_db(i):
    print(use_db.cursor) #process local
pool = Pool(initializer=init_worker, initargs=(use_db,))
pool.map(use_db, range(10))
于 2015-06-13T07:19:16.947 回答
12

torek 已经很好地解释了为什么初始化程序在这种情况下不起作用。但是,我个人不是全局变量的粉丝,所以我想在这里粘贴另一个解决方案。

这个想法是使用一个类来包装函数并使用“全局”变量初始化该类。

class Processor(object):
  """Process the data and save it to database."""

  def __init__(self, credentials):
    """Initialize the class with 'global' variables"""
    self.cursor = psycopg2.connect(credentials).cursor()

  def __call__(self, data):
    """Do something with the cursor and data"""
    self.cursor.find(data.key)

然后打电话给

p = Pool(5)
p.map(Processor(credentials), list_of_data)

所以第一个参数用凭证初始化类,返回一个类的实例,用数据映射调用实例。

虽然这不像全局变量解决方案那样简单,但我强烈建议避免使用全局变量并以某种安全的方式封装变量。(我真的希望他们有一天能支持 lambda 表达式,这会让事情变得更容易......)

于 2016-05-14T23:26:36.783 回答
9

鉴于在初始化程序中定义全局变量通常是不可取的,我们可以避免使用它们,也可以避免在每次调用中重复昂贵的初始化,并在每个子进程中进行简单的缓存:

from functools import lru_cache
from multiprocessing.pool import Pool
from time import sleep


@lru_cache(maxsize=None)
def _initializer(a, b):
    print(f'Initialized with {a}, {b}')


def _pool_func(a, b, i):
    _initializer(a, b)
    sleep(1)
    print(f'got {i}')


arg_a = 1
arg_b = 2

with Pool(processes=5) as pool:
    pool.starmap(_pool_func, ((arg_a, arg_b, i) for i in range(0, 20)))

输出:

Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
Initialized with 1, 2
got 1
got 0
got 4
got 2
got 3
got 5
got 7
got 8
got 6
got 9
got 10
got 11
got 12
got 14
got 13
got 15
got 16
got 17
got 18
got 19
于 2019-04-04T09:07:27.537 回答
0

如果您的第一个答案不清楚,这里是运行的片段:

import multiprocessing
n_proc = 5
cursor = [ 0 for _ in range(n_proc)]
def set_global_cursor():
    global cursor
    cursor[multiprocessing.current_process()._identity[0]-1] = 1

def process_data(data):
    print(cursor)
    return data**2
    
pool = multiprocessing.Pool(processes=n_proc,initializer=set_global_cursor)
pool.map(process_data, list(range(10))) 

输出:

[1, 0, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 1, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 0, 0, 1]
[1, 0, 0, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 1, 0, 0]
[0, 0, 0, 1, 0]
[0, 1, 0, 0, 0]
于 2021-08-07T08:53:03.093 回答