2

我有一个多进程程序,可以获取所有响应时间不同的网页。结果按照先进先出规则存储在进程队列中。我想通过进程号识别队列中的结果。这是我的测试平台,也是迄今为止我使用两个队列所取得的成果。还有什么办法吗?我曾尝试使用全局列表来存储结果,但两个进程似乎并不共享相同的内存空间。

#!/usr/bin/python3.2

import time
from multiprocessing import Process, Queue

def myWait(processNb, wait, resultQueues):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    resultQueues[processNb].put('Process %s started at %s wait %s ended at %s' % (processNb, startedAt, wait, endedAt))

# queue initialisation
resultQueues = [Queue(), Queue()]

# process creation arg: (process number, sleep time, queue)
proc =  [
    Process(target=myWait, args=(0, 2, resultQueues,)),
    Process(target=myWait, args=(1, 1, resultQueues,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
print(resultQueues[0].get())
print(resultQueues[1].get())
4

2 回答 2

2

不,当您使用时,进程根本不会共享地址空间multiprocessing- 这与threading所有进程共享内存的位置不同。这意味着您想要在进程之间共享的任何内容都必须通过进程之间的显式连接,例如Queue.

如果您想要组合所有进程的结果,您实际上可以只使用一个结果队列——多个进程(和多个线程)一次访问它们是非常安全的。然后,您的所有工作人员都可以将他们的结果插入到该队列中,并且主进程可以在它们进入时读取它们。

这是您将上面的代码修改为使用单个队列:

#!/usr/bin/python3.2

import time
from multiprocessing import Process, Queue

def myWait(processNb, wait, results):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    results.put('Process %s started at %s wait %s ended at %s' % (processNb, startedAt, wait, endedAt))

# queue initialisation
results = Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    Process(target=myWait, args=(0, 2, results,)),
    Process(target=myWait, args=(1, 1, results,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
print(results.get())
print(results.get())

如果您想在不读取字符串的情况下识别每个结果的来源过程,您可以轻松地将其添加为 2 元组。这将更改代码如下(我只显示了更改的部分):

import time
import multiprocessing
import queue

def myWait(processNb, wait, results):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    results.put((processNb, 'Process %s started at %s wait %s ended at %s' % (processNb, startedAt, wait, endedAt)))

# queue initialisation
results = multiprocessing.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    multiprocessing.Process(target=myWait, args=(0, 2, results,)),
    multiprocessing.Process(target=myWait, args=(1, 1, results,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
while True:
    try:
        processNb, message = queue.get_nowait()
        print "Process %d sent: %s" % (processNb, message)
    except queue.Empty:
        break

这有帮助吗?

编辑:正如另一位响应者非常正确地指出的那样,传递更多结构化数据可能比传递字符串更好,但出于解释目的,我试图让我的示例与您的示例相似。实际上,为了使将来的更改更容易,我会使用可以按名称而不是元组索引的东西(因此您不限于仅将项目添加到末尾)。

您可以使用自己的类,也可以简单地使用collections.namedtuple来完成这项工作(如果您想稍后扩展已经使用元组来使用名称的代码,后者特别有用,允许逐步迁移)。

请记住(据我所知)您可以将任何可以腌制的东西传递给队列。

于 2013-01-12T10:42:50.493 回答
1
  • mp.Process可以给 s 一个name参数,myWait然后目标函数可以使用mp.current_process().name. 所以没有必要通过processNb
  • 将进程间通信保持在最低限度。无需通过队列传递格式化字符串,只需传递将在元组中更改的字符串部分:(name, wait, startedAt, endedAt)

所以,你可以用这样的一个队列来做到这一点:

import time
import multiprocessing as mp

def myWait(wait, resultQueue):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    name = mp.current_process().name
    resultQueue.put(
        (name, wait, startedAt, endedAt))


# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = '0', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = '1', args=(1, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
for p in proc:
    name, wait, startedAt, endedAt = resultQueue.get()
    print('Process %s started at %s wait %s ended at %s' %
          (name, startedAt, wait, endedAt))
于 2013-01-12T10:41:32.057 回答