88

我在使用multiprocessing模块时遇到问题。我正在使用一个Pool工人及其map方法来同时分析大量文件。每次处理一个文件时,我都希望更新一个计数器,以便我可以跟踪还有多少文件需要处理。这是示例代码:

import os
import multiprocessing

counter = 0


def analyze(file):
    # Analyze the file.
    global counter
    counter += 1
    print counter


if __name__ == '__main__':
    files = os.listdir('/some/directory')
    pool = multiprocessing.Pool(4)
    pool.map(analyze, files)

我找不到解决方案。

4

6 回答 6

84

问题是该counter变量不在您的进程之间共享:每个单独的进程都在创建自己的本地实例并增加它。

有关可用于在进程之间共享状态的一些技术,请参阅文档的此部分。在您的情况下,您可能希望Value在您的工作人员之间共享一个实例

这是您示例的工作版本(带有一些虚拟输入数据)。请注意,它使用了我在实践中会尽量避免的全局值:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
于 2010-01-17T10:29:54.523 回答
44

没有竞争条件错误的计数器类:

class Counter(object):
    def __init__(self):
        self.val = multiprocessing.Value('i', 0)

    def increment(self, n=1):
        with self.val.get_lock():
            self.val.value += n

    @property
    def value(self):
        return self.val.value
于 2014-02-10T15:42:38.783 回答
14

一个非常简单的例子,从 jkp 的答案改变:

from multiprocessing import Pool, Value
from time import sleep

counter = Value('i', 0)
def f(x):
    global counter
    with counter.get_lock():
        counter.value += 1
    print("counter.value:", counter.value)
    sleep(1)
    return x

with Pool(4) as p:
    r = p.map(f, range(1000*1000))
于 2019-10-06T11:07:01.093 回答
7

更快的 Counter 类,无需两次使用 Value 的内置锁

class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.RawValue('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    @property
    def value(self):
        return self.val.value

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https: //docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

于 2017-11-29T22:00:28.140 回答
0

这是基于与其他答案中提出的不同方法的问题的解决方案。它使用与对象的消息传递multiprocessing.Queue(而不是与对象共享内存multiprocessing.Value)和过程安全(原子)内置递增和递减运算符+=-=(而不是引入自定义incrementdecrement方法),因为你要求它。

首先,我们定义一个类Subject来实例化一个对象,该对象将是父进程的本地对象,并且其属性将被递增或递减:

import multiprocessing
import os


class Subject:

    def __init__(self):
        self.x = 0
        self.y = 0

然后我们定义一个Proxy用于实例化对象的类,该对象将成为远程代理,子进程将通过该代理请求父进程检索或更新Subject对象的属性。进程间通信将使用两个multiprocessing.Queue属性,一个用于交换请求,一个用于交换响应。请求的形式(pid, action, *args)pid子进程标识符、action操作('get''set''increment''decrement'属性值),并且*args是目标属性和操作数。响应的形式value(对'get'请求):

class Proxy(Subject):

    def __init__(self, requests, responses):
        self.__requests = requests
        self.__responses = responses

    def __getter(self, target):
        self.__requests.put((os.getpid(), 'get', target))
        value = self.__responses.get()
        return Decorator(value)

    def __setter(self, value, target):
        action = getattr(value, 'action', 'set')
        self.__requests.put((os.getpid(), action, target, value))

    @property
    def x(self):
        return self.__getter('x')

    @property
    def y(self):
        return self.__getter('y')

    @x.setter
    def x(self, value):
        self.__setter(value, 'x')

    @y.setter
    def y(self, value):
        self.__setter(value, 'y')

    def stop(self):
        self.__requests.put((os.getpid(), 'stop'))

然后我们定义一个类Decorator来装饰一个int对象的getter返回的Proxy对象,以便通过添加一个属性来通知它的setter是递增还是递减运算符+=并且-=已经被使用action,在这种情况下setter请求一个'increment'or'decrement'操作而不是一个'set'操作. 递增和递减运算符+=-=调用相应的增强赋值特殊方法__iadd____isub__如果它们被定义,则回退到赋值特殊方法__add____sub__并且始终为int对象定义(例如proxy.x += value,等价于proxy.x = proxy.x.__iadd__(value)哪个等价于proxy.x = type(proxy).x.__get__(proxy).__iadd__(value)哪个等价于type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy).__iadd__(value))):

class Decorator(int):

    def __iadd__(self, other):
        value = Decorator(other)
        value.action = 'increment'
        return value

    def __isub__(self, other):
        value = Decorator(other)
        value.action = 'decrement'
        return value

然后我们定义worker将在子进程中运行的函数并请求递增和递减操作:

def worker(proxy):
    proxy.x += 1
    proxy.y -= 1
    proxy.stop()

最后我们编写只在父进程中运行的代码:

if __name__ == '__main__':
    requests = multiprocessing.Queue()
    responses = multiprocessing.Queue()
    subject = Subject()
    proxy = Proxy(requests, responses)
    processes = [
        multiprocessing.Process(target=worker, args=(proxy,)) for _ in range(4)
    ]
    running = len(processes)
    for process in processes:
        process.start()
    while running > 0:
        pid, action, *args = requests.get()
        print(pid, action, *args)
        if action == 'stop':
            running -= 1
        elif action == 'get':
            responses.put(getattr(subject, args[0]))
        elif action == 'set':
            setattr(subject, args[0], args[1])
        elif action == 'increment':
            setattr(subject, args[0], getattr(subject, args[0]) + args[1])
        elif action == 'decrement':
            setattr(subject, args[0], getattr(subject, args[0]) - args[1])
    for process in processes:
        process.join()
    assert subject.x == 4
    assert subject.y == -4

最后的断言确保了这一点+=并且-=是过程安全的。您可以通过注释相应的__iadd__or来删除过程安全__isub__Decorator并观察相应的断言失败(例如proxy.x += value,相当于proxy.x = proxy.x.__iadd__(value)但回退到proxy.x = proxy.x.__add__(value)if__iadd__未定义,相当于proxy.x = proxy.x + valuewhich 相当于proxy.x = type(proxy).x.__get__(proxy) + valuewhich 相当于type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy) + value),所以action属性不是添加并且设置器请求'set'操作而不是'increment'操作)。

流程安全会话示例(原子+=-=):

35845 get x
35844 get x
35845 increment x 1
35845 get y
35844 increment x 1
35845 decrement y 1
35844 get y
35845 stop
35844 decrement y 1
35844 stop
35846 get x
35846 increment x 1
35846 get y
35846 decrement y 1
35846 stop
35847 get x
35847 increment x 1
35847 get y
35847 decrement y 1
35847 stop

示例进程不安全会话(非原子+=-=):

51101 get x
51101 set x 1
51101 get y
51101 set y -1
51101 stop
51100 get x
51099 get x
51100 set x 2
51100 get y
51099 set x 2
51099 get y
51100 set y -2
51099 set y -2
51100 stop
51099 stop
51102 get x
51102 set x 3
51102 get y
51102 set y -3
51102 stop
Traceback (most recent call last):
  File "/Users/maggyero/Desktop/foo.py", line 92, in <module>
    assert subject.x == 4
AssertionError
于 2022-02-13T17:40:35.290 回答
-1

我正在研究 PyQT5 中的进程栏,所以我一起使用线程和池

import threading
import multiprocessing as mp
from queue import Queue

def multi(x):
    return x*x

def pooler(q):
    with mp.Pool() as pool:
    count = 0
    for i in pool.imap_unordered(ggg, range(100)):
        print(count, i)
        count += 1
        q.put(count)

def main():
    q = Queue()
    t = threading.Thread(target=thr, args=(q,))
    t.start()
    print('start')
    process = 0
    while process < 100:
        process = q.get()
        print('p',process)
if __name__ == '__main__':
    main()

我把这个放在 Qthread 工作线程中,它可以在可接受的延迟下工作

于 2019-10-24T07:28:03.237 回答