1

我正在尝试使用该multiprocessing.Pool对象并行运行一些数据库查询。我正在使用 MySQLdb。

我有一些模块级函数,我在其中定义要运行的查询,如下所示:

def check_foo(cursor, table):
    query = "(some query)"
    cursor.execute(query)
    results = cursor.fetchall()
    return len(results) == 0

这些函数是在程序运行时收集的,如下所示:

if __name__ == '__main__':
    check_functions = [v for k, v in globals().items()
                             if k.startswith('check_') and callable(v)]

我还有一个模块级函数,它在表列表上运行特定的检查函数:

def run_check_on_all((tables, cursor, f)):
    return [f(cursor, table) for table in tables]

我想为每个调用run_check_on_all该函数的检查函数设置一个工作进程。这是我的尝试:

if __name__ == '__main__':
    ...

    pool = multiprocessing.Pool(len(check_functions))
    cursors = [conn.cursor() for i in range(len(check_functions))]

    print "Running {0} check(s)...".format(len(check_functions))
    table_lists = [table_list] * len(check_functions)
    all_results = pool.map(run_check_on_all, zip(table_lists, cursors, check_functions))

当我尝试运行它时,我收到以下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/Python2.6/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/local/Python2.6/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/Python2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

正如您可以(希望)看到的那样,调用中涉及的任何内容都不pool.map是实例方法。run_check_on_all并且每个check_functions都是模块级功能。table_lists是字符串列表的列表。cursors是 MySQLdb 游标对象的列表。

我想这可能与在检查函数中调用游标对象的实例方法有关,但我用这样的虚拟函数替换了它们

def check_foo(cursor, table):
    print "hello"

仍然没有运气。

错误所指的实例方法在哪里?

4

1 回答 1

1

The problem is that you attempt to pass database cursor objects between processes. Each process must create a connection to the database, and create a cursor on that connection.

于 2013-06-14T07:00:11.577 回答