1

我正在练习使用共享值进行多处理。我有一个使用共享值工作的现有 Process 函数:

def run_procs_with_loop(lock):

    # this is my shared value 
    shared_number = Value('i', 0)

    print(__name__, 'shared_value in the beginning', shared_number.value)

    # create a process list to append each process spawned by the for- loop
    processes = []
    for _ in range(2):
        p = Process(target=add_100_locking, args=(shared_number, lock))
        processes.append(p)
        p.start()

    for _ in processes:
        p.join()

    print('shared_value at the end', shared_number.value)


上面的进程是针对spawn两个进程,每个进程都指向一个带有args(shared_number, lock)的函数。该函数按预期运行。

我试图将其转换为多处理池 - 我试图在我的 pool.map() 语句中传递参数 `[ shared_number, lock] *2 (我希望池只产生两个进程)但 python 拒绝它:

def run_procs_with_pool(lock):

    shared_number = Value('i', 0)
    print(__name__, 'shared_value in the beginning', shared_number.value)

    # create processes using multiprocessing.Pool
    pool = Pool()
    pool.map(add_100_with_lock, [(shared_number,lock)] * 2)

    print('shared_value at the end', shared_number.value)


感谢您提前提供任何有用的输入。


更新:

有人建议我使用星图而不是地图,但我得到了错误RuntimeError: Synchronized objects should only be shared between processes through inheritance。看起来multiprocessing.Pool不允许以这种方式传递共享值?

以为我会共享任务功能add_100_with_lock,如下所示:

def add_100_with_lock(num,locking):
    for _ in range(100):
        time.sleep(0.001)
        with lock:
            num.value += 1    

有没有办法让传递共享值与 multiprocessing.Pool 一起工作?

4

2 回答 2

0

我终于能够通过使用 Manager() 对象解决多处理池中关于共享变量的限制问题 - 每个 python 文档:Managers provide a way to create data "which can be shared between different processes", including sharing over a network between processes running on different machines..

我是这样做的:

    # use the manager class to share objects between processes
    manager = Manager()
    shared_number = manager.Value('i', 0)

由于我将只传递 shared_number (在创建池时使用 initializer= kwarg 传递锁定对象(您可以在此处multiprocessing lock()的讨论中阅读所有相关内容),我可以返回使用 pool.map() 而不是池星图()。

这是完整的工作模块:

from  multiprocessing import Lock, Pool, Manager
import time

# init function passed to Pool initializer to share multiprocessing.Lock() object to worker processes
def init_lock(l, ):
    global lock
    lock = l

def add_100_with_lock(num):

    # Since our pool process spawns TWO worker processes, and both processes share the 'num' variable, 
    # this 'num' value will be 200 after our two processes are done executing (100 * 2 parallel processes = 200).
    # I applied multiprocess locking here to avoid race conditions between worker processes
    for _ in range(100):
        time.sleep(0.001)
        with lock:
            num.value += 1

# Pool method 
def run_procs_lock_with_pool():
    
    # use the manager class to share objects between processes
    manager = Manager()
    shared_number = manager.Value('i', 0)

    print(__name__, 'shared_value in the beginning', shared_number.value)

    # like shared values, locks cannot be shared in a Pool - instead, pass the 
    # multiprocessing.Lock() at Pool creation time, using the initializer=init_lock.
    # This will make your lock instance global in all the child workers.
    # The init_lock is defined as a function - see init_lock() at the top.
    pool = Pool(initializer=init_lock, initargs=(l,))
    # specified two worker processes in the pool with the arg "[shared_number]*2"
    pool.map(add_100_with_lock, [shared_number]*2)


    print('shared_value at the end', shared_number.value)


if __name__ == '__main__':

    run_procs_lock_with_pool()
于 2022-02-17T18:16:36.500 回答
0

When you write

pool.map(add_100_with_lock, [(shared_number,lock)] * 2)

the iterable you are passing as a parameter is a list of tuples, so add_100_with_lock will not get two parameters, but a single tuple, as if you called add_100_with_lock((shared_number,lock,)) instead of add_100_with_lock(shared_number,lock). Pool.map is implemented for functions having only one parameter.

You can change the definition of add_100_with_lock, although I do not recommend this solution. You can also wrap it into another function that receives a tuple and calls it, i.e.:

def wrap_add_100(args):
    return add_100_with_lock(*args)
...
pool.map(wrap_add_100, [(shared_number,lock)] * 2)

or use Pool.starmap, that expects a list of iterables and takes one of each for using as parameters:

pool.starmap(add_100_with_lock, [[shared_number]*2, [lock]*2])

This last option is what I recommend, since it preserves the function signature.

于 2022-02-17T03:32:00.763 回答