2

Python 3.1.2

我在multiprocessing.Process产生的两个线程之间共享变量时遇到问题。这是一个简单的布尔变量,它应该确定线程是否应该运行或应该停止执行。下面是在三种情况下显示的简化代码(但使用与我的原始代码相同的机制):

  1. threading.Thread 类型和 self.is_running bool 类型的主类beeing [工作正常]。
  2. multiprocess.Process 类型和 self.is_running bool 类型的主类beeing [不工作。子线程拥有 self.is_running 的本地副本,而不是共享它]。
  3. multiprocess.Process 类型和 self.is_running 的主类beeing 是 multiprocessing.Value("b", True) [工作正常] 类型。

我想了解为什么它以这种方式而不是另一种方式工作。(即为什么第 2 点没有像我假设的那样工作)。

测试是从 python 的解释器完成的:

from testclass import *

d = TestClass()
d.start()
d.stop()

以下是第 1 点的示例:

import threading
import time
import queue
import multiprocessing

class TestClass(threading.Thread):
def __init__(self):
    threading.Thread.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第 2 点的示例:

import threading
import time
import queue
import multiprocessing


class Test(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第 3 点的示例:

import threading
import time
import queue
import multiprocessing

class TestClass(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = multiprocessing.Value("b", True)
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running.value
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running.value = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()
4

2 回答 2

0

线程都是同一个进程的一部分,所以它们共享内存。另一个后果是线程不能被不同的 cpu 精确地同时执行,因为一个进程只能被一个 cpu 拾取。

进程有独立的内存空间。一个 cpu 可以运行一个进程,而另一个 cpu 可以同时运行另一个进程。需要特殊的结构来让流程合作。

于 2012-07-03T10:43:55.583 回答
0

在第 2 点中,父进程和子进程都有自己的is_running. 当你调用stop()父进程时,它只is_running在父进程中修改,而不在子进程中修改。multiprocessing.Value起作用的原因是它的内存在两个进程之间共享。

如果您想要一个进程感知队列,请使用multiprocessing.Queue

于 2012-07-03T16:15:36.767 回答