3

我试图在 multiprocessing.pool.map() 调用的函数中按顺序递增一个数字。当我运行以下代码时,我得到的数字增加了与每个数字的池相同的次数。

import time
import multiprocessing
import decimal
import random

lists = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h','i', 'j', 'k']
def thefunction(listi):
    global number
    number += 1
    time.sleep(decimal.Decimal(random.random()))
    print time.strftime('%H:%M:%S'), number, listi

number = 0
pool = multiprocessing.Pool(4)
pool.map(thefunction, lists)
print number

结果像这样打印出来

01:01:28 1 b
01:01:28 2 e
01:01:28 1 a
01:01:28 1 c
01:01:28 1 d
01:01:28 2 h
01:01:29 2 i
01:01:29 2 g
01:01:29 3 f
01:01:29 3 j
01:01:29 3 k
0

如何正确增加数字?

(time.sleep(decimal.Decimal(random.random())) 仅用于停止脚本打印到同一行)

4

2 回答 2

4

该示例不起作用的原因是正在分别创建和递增计数器的几个实例。

您需要创建一个共享计数器和锁,并为每个启动的进程适当地初始化:

import time
from multiprocessing import Pool, Value, Lock
import decimal
import random

number = Value('i', 0)
lock = Lock()
lists = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h','i', 'j', 'k']

def thefunction(listi):
    time.sleep(decimal.Decimal(random.random()))
    with lock:
        number.value += 1
        print time.strftime('%H:%M:%S'), number.value, listi

def initializer(*args):
    global number, lock
    number, lock = args

pool = Pool(4, initializer, (number, lock))
pool.map(thefunction, lists)
print number.value
于 2012-08-25T01:26:29.563 回答
2

您可能想要一个multiprocessing.Value共享状态。

或者更确切地说,这会满足您的要求,但可能不是您真正想要的;并行上下文中的共享状态通常是设计缺陷的标志。您可以做的一件事是让每个进程跟踪它已处理的项目数量,并将计数返回给父进程;然后,父母可以合计每个人完成的工作数量,以理清到目前为止已经完成了多少工作。

您尝试跨流程边界计数的原因是什么?

于 2012-08-25T00:38:06.903 回答