这是基于与其他答案中提出的不同方法的问题的解决方案。它使用与对象的消息传递multiprocessing.Queue
(而不是与对象共享内存multiprocessing.Value
)和过程安全(原子)内置递增和递减运算符+=
和-=
(而不是引入自定义increment
和decrement
方法),因为你要求它。
首先,我们定义一个类Subject
来实例化一个对象,该对象将是父进程的本地对象,并且其属性将被递增或递减:
import multiprocessing
import os
class Subject:
def __init__(self):
self.x = 0
self.y = 0
然后我们定义一个Proxy
用于实例化对象的类,该对象将成为远程代理,子进程将通过该代理请求父进程检索或更新Subject
对象的属性。进程间通信将使用两个multiprocessing.Queue
属性,一个用于交换请求,一个用于交换响应。请求的形式(pid, action, *args)
是pid
子进程标识符、action
操作('get'
、'set'
、'increment'
或'decrement'
属性值),并且*args
是目标属性和操作数。响应的形式value
(对'get'
请求):
class Proxy(Subject):
def __init__(self, requests, responses):
self.__requests = requests
self.__responses = responses
def __getter(self, target):
self.__requests.put((os.getpid(), 'get', target))
value = self.__responses.get()
return Decorator(value)
def __setter(self, value, target):
action = getattr(value, 'action', 'set')
self.__requests.put((os.getpid(), action, target, value))
@property
def x(self):
return self.__getter('x')
@property
def y(self):
return self.__getter('y')
@x.setter
def x(self, value):
self.__setter(value, 'x')
@y.setter
def y(self, value):
self.__setter(value, 'y')
def stop(self):
self.__requests.put((os.getpid(), 'stop'))
然后我们定义一个类Decorator
来装饰一个int
对象的getter返回的Proxy
对象,以便通过添加一个属性来通知它的setter是递增还是递减运算符+=
并且-=
已经被使用action
,在这种情况下setter请求一个'increment'
or'decrement'
操作而不是一个'set'
操作. 递增和递减运算符+=
并-=
调用相应的增强赋值特殊方法__iadd__
,__isub__
如果它们被定义,则回退到赋值特殊方法__add__
,__sub__
并且始终为int
对象定义(例如proxy.x += value
,等价于proxy.x = proxy.x.__iadd__(value)
哪个等价于proxy.x = type(proxy).x.__get__(proxy).__iadd__(value)
哪个等价于type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy).__iadd__(value))
):
class Decorator(int):
def __iadd__(self, other):
value = Decorator(other)
value.action = 'increment'
return value
def __isub__(self, other):
value = Decorator(other)
value.action = 'decrement'
return value
然后我们定义worker
将在子进程中运行的函数并请求递增和递减操作:
def worker(proxy):
proxy.x += 1
proxy.y -= 1
proxy.stop()
最后我们编写只在父进程中运行的代码:
if __name__ == '__main__':
requests = multiprocessing.Queue()
responses = multiprocessing.Queue()
subject = Subject()
proxy = Proxy(requests, responses)
processes = [
multiprocessing.Process(target=worker, args=(proxy,)) for _ in range(4)
]
running = len(processes)
for process in processes:
process.start()
while running > 0:
pid, action, *args = requests.get()
print(pid, action, *args)
if action == 'stop':
running -= 1
elif action == 'get':
responses.put(getattr(subject, args[0]))
elif action == 'set':
setattr(subject, args[0], args[1])
elif action == 'increment':
setattr(subject, args[0], getattr(subject, args[0]) + args[1])
elif action == 'decrement':
setattr(subject, args[0], getattr(subject, args[0]) - args[1])
for process in processes:
process.join()
assert subject.x == 4
assert subject.y == -4
最后的断言确保了这一点+=
并且-=
是过程安全的。您可以通过注释相应的__iadd__
or来删除过程安全__isub__
性Decorator
并观察相应的断言失败(例如proxy.x += value
,相当于proxy.x = proxy.x.__iadd__(value)
但回退到proxy.x = proxy.x.__add__(value)
if__iadd__
未定义,相当于proxy.x = proxy.x + value
which 相当于proxy.x = type(proxy).x.__get__(proxy) + value
which 相当于type(proxy).x.__set__(proxy, type(proxy).x.__get__(proxy) + value)
,所以action
属性不是添加并且设置器请求'set'
操作而不是'increment'
操作)。
流程安全会话示例(原子+=
和-=
):
35845 get x
35844 get x
35845 increment x 1
35845 get y
35844 increment x 1
35845 decrement y 1
35844 get y
35845 stop
35844 decrement y 1
35844 stop
35846 get x
35846 increment x 1
35846 get y
35846 decrement y 1
35846 stop
35847 get x
35847 increment x 1
35847 get y
35847 decrement y 1
35847 stop
示例进程不安全会话(非原子+=
和-=
):
51101 get x
51101 set x 1
51101 get y
51101 set y -1
51101 stop
51100 get x
51099 get x
51100 set x 2
51100 get y
51099 set x 2
51099 get y
51100 set y -2
51099 set y -2
51100 stop
51099 stop
51102 get x
51102 set x 3
51102 get y
51102 set y -3
51102 stop
Traceback (most recent call last):
File "/Users/maggyero/Desktop/foo.py", line 92, in <module>
assert subject.x == 4
AssertionError