0

我是并行处理的新手,但有一个对它有用的应用程序。我有大约 10-100k 对象实例(类型ClassA),我想使用多处理模块来分配在每个对象上调用特定类方法的工作。我已经阅读了大部分多处理文档和几篇关于调用类方法的帖子,但我还有一个额外的复杂情况是 ClassA 对象都有一个指向另一个类型的相同实例的属性(ClassB),他们可以添加/删除自己或其他对象。我知道共享状态对并发进程不利,所以我想知道这是否可能。老实说,代理/管理器多重处理方法让我难以理解它们对共享对象的所有影响,但如果有人向我保证我可以让它工作,我会花更多时间来理解它们。如果没有,这将是设计分布式流程的一个教训。

这是我的问题的简化版本:

ClassA:
    def __init__(self, classB_state1, classB_state2, another_obj):
        # Pointers to shared ClassB instances
        self.state1 = classB_state1
        self.state2 = classB_state2
        self.state1.add(self)
        self.object = another_obj

    def run(classB_anothercommonpool):
        # do something to self.object
        if #some property of self.object: 
            classB_anothercommonpool.add(object)
            self.object = None

        self.switch_states()

    def switch_states(self):
        if self in self.state1: 
            self.state1.remove(self)
            self.state2.add(self)

        elif self in self.state2:
            self.state2.remove(self)
            self.state1.add(self)

        else: 
            print "State switch failed!"

ClassB(set): 
# This is essentially a glorified set with a hash so I can have sets of sets.
# If that's a bad design choice, I'd also be interested in knowing why
    def __init__(self, name):
        self.name = name
        super(ClassB, self).__init__()

    def __hash__(self):
        return id(self)

ClassC:
    def __init__(self, property):
        self.property = property

# Define an import-able function for the ClassA method, for multiprocessing
def unwrap_ClassA_run(classA_instance):
    return classA_instance.run(classB_anothercommonpool)

def initialize_states():
    global state1
    global state2
    global anothercommonpool

    state1            = ClassB("state1")
    state2            = ClassB("state2")
    anothercommonpool = ClassB("objpool")

现在,在定义类的同一个 .py 文件中:

from multiprocessing import Pool

def test_multiprocessing():
    initialize_states()

    # There are actually 10-100k classA instances
    object1 = ClassC('iamred')  
    object2 = ClassC('iamblue')
    classA1 = ClassA(state1, state2, object1)
    classA2 = ClassA(state1, state2, object2)

    pool = Pool(processes = 2)
    pool.map(unwrap_ClassA_run, [classA1, classA2])

如果我在解释器中导入这个模块并运行 test_multiprocessing(),我在运行时不会出错,但是“切换状态失败!” 显示消息,如果您检查 classA1/2 对象,它们没有修改各自的 objects1/2,也没有切换任何 ClassB 状态对象的成员资格(因此 ClassA 对象不会注册它们是 state1 集合的成员)。谢谢!

4

0 回答 0