4

我是多处理的新手,我只是想在 Python 3.2 中编写一个简单的程序,它的计数器在一个线程中无限增加,而第二个线程检查第一个线程中是否已达到给定值。达到该值后,我希望关闭多处理线程,并让程序显示“处理完成”语句。

据我了解,该程序看起来像(给定值为 10):

import multiprocessing as mp


def Counter():
    i=1
    while i > 0:
        print("i: ",i)
        i+=1


def ValueTester(i):
    if i >= 10:
        *End Counter Function Thread*


if __name__ == '__main__':

    *Begin multiprocessing, one thread for "Counter" and a second for "ValueTester"*

    print("Process Complete")

对于伪代码的模糊性,我深表歉意;我已经阅读了 Python 文档以及几个不同的示例,但似乎找不到简单的解决方案。

此外,一旦这起作用了,我将如何设置给定的停止值(即,将变量传递给ValueTester,而不仅仅是使用10)?

非常感谢您的帮助。

4

2 回答 2

5

我们需要注意在这里明确区分线程和进程。

线程都在同一个进程下运行。访问的值可以在线程之间共享。只有当值受到保护时,线程才能以安全(协调)的方式更改值threading.Lock在它改变之前。在 CPython 中,Python 和 PyPy 最常见的实现,但与 Jython 或 Iron Python 等其他实现相比,GIL(全局解释器锁)可防止多个线程在任何给定时间运行。所以在 CPython 下,多个线程实际上是串行运行的,而不是同时运行的。尽管如此,多线程对于 I/O 密集型工作还是很有用的,例如查询许多网站,因为大部分时间都花在等待网络 (I/O) 活动上。因此,与诸如数学计算之类的 CPU 密集型任务相比,多个线程不必为争夺对单个 CPU 的访问权而等待太多。

现在说了这么多,您正在处理多个进程,而不是线程。进程相互独立。如果可用(包括在 CPython 下),它们可以并且确实在多个 CPU 上同时运行。当您生成一个进程时,全局值会从原始进程复制到生成的进程。在某些具有“写入时复制”的操作系统(如 Linux)上,这些值实际上在进程之间共享,直到一个进程试图覆盖一个值,此时该值被复制以独立于另一个进程。因此,当您修改值时,两个过程最终会得到两个名称相同但可以具有完全不同的值的变量

多处理模块提供了一些特殊的对象来促进 进程之间的共享状态。这些包括mp.Value, mp.Array, mp.Manager。请注意,当您使用这些对象时,您还必须使用 amp.Lock 来防止一个进程更改值,而另一个进程正在尝试做同样的事情。但是,锁也会减慢进程,因为必须等待锁被释放。

现在要在另一个进程中达到条件时向进程发出信号,请使用mp.Event

import multiprocessing as mp
import time

def Counter(i, event, lock):
    with lock:
        i.value = 1
    while i.value > 0 and not event.is_set():
        print("i: ", i.value)
        with lock:
            i.value += 1


def ValueTester(i, stopval, event):
    while True:
        if i.value >= stopval:
            event.set()
            break
        else:
            time.sleep(0.1)


if __name__ == '__main__':
    num = mp.Value('d', 0.0)
    # A lock is not absolutely necessary here since only one process modifies
    # num, but I'm including it since it is necessary (to avoid race conditions)
    # in the more usual case when multiple processes may modify num.
    lock = mp.Lock()
    event = mp.Event()
    counter = mp.Process(target=Counter, args=(num, event, lock))
    counter.start()
    tester = mp.Process(target=ValueTester, args=(num, 10, event))
    tester.start()
    tester.join()
    counter.join()
    print("Process Complete")

有关如何使用多处理的更多示例,请参阅Doug Hellman 的 Python Module of the Week 教程

于 2013-09-30T14:24:26.297 回答
1

所以你需要的是两个线程(进程)相互通信的一些机制。幸运的是,Python 的多处理模块为您提供了一些选项,其中之一是队列。

所以首先要做的是启动这两个def,并传入一个共享队列,它们可以用来通信。由于您希望主进程杀死子进程,因此第一个 proc 应该启动第二个。

import multiprocessing as mp
from multiprocessing import Queue

def counter(): #use lowercase c, 'Counter' is importable
    threshold = 10
    output = Queue(1) #for placing numbers on the queue
    input = Queue(1) #for looking for the signal that child is done
    child = Process(target=valuetester, args=(threshold, output, input))
    child.start()
    i=1
    while i > 0:
        output.put(i)
        print("i: ",i)
        i+=1 
        try:
            done = input.get_nowait()
            if done == 'Im Done!':
                print 'Process Complete!'
                child.join() #clean up the child proc
                return
        except Empty:
            pass #input is empty, no big deal


def valuetester(threshold, input, output):
    thresholdNotPassed = False
    while thresholdNotPassed:
        i = input.get()
        if i >= threshold:
            output.put('Im Done!')
            return 
        else:
            pass #nothing to do... just wait

#Start the counter proc. You could use the main process since its easier
if __name__ == 'main':
    counter()

需要注意的几件事:我使队列的最大大小为一......如果队列已满,这将做的是阻塞(保持在该行)。

你可以看到我还使用了一个 get_nowait() 作为主进程来检查子进程是否完成,否则使用 normalget会阻塞那里,我们会死锁!

于 2013-09-30T14:25:06.583 回答