3

我有一个名为 Experiment 的课程和另一个名为 Case 的课程。一项实验由许多个案组成。请参阅下面的类定义,

from multiprocessing import Process

class Experiment (object):
    def __init__(self, name):
        self.name = name
        self.cases = []
        self.cases.append(Case('a'))
        self.cases.append(Case('b'))
        self.cases.append(Case('c'))

    def sr_execute(self):
        for c in self.cases:  
            c.setVars(6)

class Case(object):
    def __init__(self, name):
        self.name = name
    def setVars(self, var):
        self.var = var

在我的实验课中,我有一个名为 sr_execute 的函数。此函数显示所需的行为。我有兴趣解析所有案例并为每个案例设置一个属性。当我运行以下代码时,

if __name__ == '__main__':
    #multiprocessing.freeze_support()
    e = Experiment('exp')
    e.sr_execute()
    for c in e.cases: print c.name, c.var

我明白了,

a 6
b 6
c 6

这是期望的行为。

但是,我想使用多处理并行执行此操作。为此,我在实验类中添加了一个 mp_execute() 函数,

    def mp_execute(self):
        processes = []
        for c in self.cases: 
            processes.append(Process(target= c.setVars, args = (6,)))
        [p.start() for p in processes]
        [p.join() for p in processes]

但是,这不起作用。当我执行以下操作时,

if __name__ == '__main__':
    #multiprocessing.freeze_support()
    e = Experiment('exp')
    e.mp_execute()
    for c in e.cases: print c.name, c.var

我得到一个错误,

AttributeError: 'Case' object has no attribute 'var'

显然,我无法使用多处理设置类属性。

任何线索是怎么回事,

4

2 回答 2

3

你打电话时:

def mp_execute(self):
    processes = []
    for c in self.cases: 
        processes.append(Process(target= c.setVars, args = (6,)))
    [p.start() for p in processes]
    [p.join() for p in processes]

当您创建Process它时,它将使用您的对象的副本,并且对此类对象的修改不会传递给主程序,因为不同的进程具有不同的地址空间。如果您使用 s 它将起作用,Thread因为在这种情况下不会创建任何副本。

另请注意,您的代码在 Windows 中可能会失败,因为您正在传递一个方法,target并且 Windows 要求 是target可挑选的(并且实例方法是不可挑选的)。target应该是在模块顶层定义的函数,以便在所有 Oses 上工作。

如果您想与主流程沟通,您可以:

  • 使用 aQueue传递结果
  • 使用 aManager构建共享对象

无论如何,您必须通过设置“通道”(如 a Queue)或设置共享状态来“明确地”处理通信。


样式说明:不要这种方式使用列表理解:

[p.join() for p in processes]

这是完全错误的。您只是在浪费空间创建Nones 列表。与正确的方法相比,它也可能更慢:

for p in processes:
    p.join()

因为它必须将元素附加到列表中。

有人说列表理解比for循环稍快,但是:

  • 性能上的差异是如此之小,以至于它通常无关紧要
  • 当且仅当您考虑这种循环时,它们才会更快:

    a = []
    for element in something:
        a.append(element)
    

    如果循环,就像在这种情况下,不创建a list,那么for循环会更快。

顺便说一句:有些使用map相同的方式来执行副作用。这又是错误的,因为与以前相同的原因,您不会获得太多的速度,并且map它在返回迭代器的 python3 中完全失败,因此它根本不会执行函数,从而使代码的可移植性降低。

于 2013-11-07T07:48:14.433 回答
2

@Bakuriu 的回答提供了很好的样式和效率建议。确实,每个进程都会获得主进程堆栈的副本,​​因此除非您使用某种形式的 IPC(例如队列、管道、管理器),否则分叉进程所做的更改不会反映在主进程的地址空间中。

但是您遇到的特定AttributeError: 'Case' object has no attribute 'var'错误还有一个原因,即您的 Case 对象在您启动流程时还没有该属性。var相反,var属性是在setVars()方法中创建的。

您的分叉进程在调用时确实会创建变量setVars()(实际上甚至将其设置为 6),但是可惜,这种更改仅在 Case 对象的副本中,即没有反映在主进程的内存空间中(变量仍然存在不存在)。

要了解我的意思,请将您的 Case 类更改为:

class Case(object):
    def __init__(self, name):
        self.name = name
        self.var = 7  # Create var in the constructor.
    def setVars(self, var):
        self.var = var

通过var在构造函数中添加成员变量,您的主进程将可以访问它。当然,fork 进程中的变化仍然不会反映在 master 进程中,但至少你不会得到错误:

a 7
b 7
c 7

希望这能阐明正在发生的事情。=)


解决方案:

最少侵入(对原始代码)的事情是使用共享内存中的 ctypes 对象:

from multiprocessing import Value
class Case(object):
    def __init__(self, name):
        self.name = name
        self.var = Value('i', 7)              # Use ctypes "int" from shared memory.
    def setVars(self, var):
        self.var.value = var                  # Set the variable's "value" attribute.

并将您的 main() 更改为打印 c.var.value:

for c in e.cases: print c.name, c.var.value   # Print the "value" attribute.

现在您有了所需的输出:

a 6
b 6
c 6
于 2013-11-08T16:17:07.707 回答