严格来说,您的托管列表不在共享内存中,了解正在发生的事情非常重要。保存您的实例的实际列表Trade
驻留在执行Manager()
调用时创建的进程中。然后执行records = manager.list([Trade(5)])
时,records
不是对该列表的直接引用,因为正如我所说,我们不处理共享内存。相反,它是一个特殊的代理对象,它实现了与列表相同的方法,但是当你,例如,调用append
在这个代理对象上,它接受您尝试附加的参数并将其序列化,并通过套接字或管道将其传输到管理器的进程,在该进程中它被反序列化并附加到实际列表中。简而言之,对代理对象的操作变成了远程方法调用。
现在解决你的问题。您正在尝试order_id
使用以下语句重置属性:
self.trades[idx].order_id = 0
由于我们正在通过代理对象处理远程列表,因此不幸的是,上述语句等效于:
trade = self.trades[idx] # fetch object from the remote list
trade.order_id = 0 # reset the order_id to 0 on the local copy
缺少的是使用新更新的交易对象更新列表:
self.trades[idx] = trade
因此,您的单个更新语句确实需要替换为上述 3 语句序列。
我还冒昧地以多种方式修改了您的代码。
- Python 代码的PEP8 样式指南建议将类名大写。
- 由于您的所有流程类在构造方式上都是相同的(即具有相同
__init__
的方法),因此我创建了一个抽象基类,TestProcess
这些类继承自该基类。他们所要做的就是提供一种run
方法。
- 我已经制作了这些进程类守护程序类。这意味着它们将在主进程终止时自动终止。我这样做是为了演示目的,这样程序就不会无休止地循环。主进程将在 15 秒后终止。
- 您不需要将交易和锁定参数传递给类的
__init__
方法Process
。例如,如果您不是从中派生类,Process
而只是想让新创建的进程运行一个foo
接受参数trades和lock的函数,那么您可以指定p1 = Process(target=foo, args=(trades, lock))
. 这是args
参数的真正目的,即与目标参数一起使用。有关详细信息,请参阅threading.Thread
类文档。实际上,我实际上认为从中派生类的价值很小multiprocessing.Process
(不这样做会有更好的重用机会)。但既然你这样做了,你已经在你的__init__
方法设置实例属性self.trades
和,当您调用方法隐式调用您的方法self.lock
时将使用该属性。您无需再做任何事情。请参阅最后的两个附加代码示例。run
start
from multiprocessing import Process, Manager, Lock
from time import sleep
import random
from abc import ABC, abstractmethod
class Trade:
def __init__(self, id):
self.exchange = None
self.order_id = id
class TestProcess(Process, ABC):
def __init__(self, trades, lock):
Process.__init__(self, daemon=True)
self.trades = trades
self.lock = lock
@abstractmethod
def run():
pass
class TestProcess2(TestProcess):
def run(self):
while True:
# lock.acquire()
print("Altering")
for idx in range(len(self.trades)):
trade = self.trades[idx]
trade.order_id = 0
# We must tell the managed list that it has been updated!!!:
self.trades[idx] = trade
# lock.release()
sleep(1)
class TestProcess1(TestProcess):
def run(self):
while True:
print("start")
for idx in range(len(self.trades)):
print(f'index = {idx}, order id = {self.trades[idx].order_id}')
sleep(1)
class TestProcess(TestProcess):
def run(self):
while True:
# lock.acquire()
n = random.randint(0, 9)
print("adding random {}".format(n))
self.trades.append(Trade(n))
# lock.release()
# print(trades)
sleep(5)
if __name__ == "__main__":
with Manager() as manager:
records = manager.list([Trade(5)])
lock = Lock()
p1 = TestProcess(records, lock)
p1.start()
p2 = TestProcess1(records, lock)
p2.start()
p3 = TestProcess2(records, lock)
p3.start()
sleep(15) # run for 15 seconds
使用非派生类multiprocessing.Process
from multiprocessing import Process, Manager, Lock
from time import sleep
import random
from abc import ABC, abstractmethod
class Trade:
def __init__(self, id):
self.exchange = None
self.order_id = id
class TestProcess(ABC):
def __init__(self, trades, lock):
self.trades = trades
self.lock = lock
@abstractmethod
def process():
pass
class TestProcess2(TestProcess):
def process(self):
while True:
# lock.acquire()
print("Altering")
for idx in range(len(self.trades)):
trade = self.trades[idx]
trade.order_id = 0
# We must tell the managed list that it has been updated!!!:
self.trades[idx] = trade
# lock.release()
sleep(1)
class TestProcess1(TestProcess):
def process(self):
while True:
print("start")
for idx in range(len(self.trades)):
print(f'index = {idx}, order id = {self.trades[idx].order_id}')
sleep(1)
class TestProcess(TestProcess):
def process(self):
while True:
# lock.acquire()
n = random.randint(0, 9)
print("adding random {}".format(n))
self.trades.append(Trade(n))
# lock.release()
# print(trades)
sleep(5)
if __name__ == "__main__":
with Manager() as manager:
records = manager.list([Trade(5)])
lock = Lock()
tp = TestProcess(records, lock)
p1 = Process(target=tp.process, daemon=True)
p1.start()
tp1 = TestProcess1(records, lock)
p2 = Process(target=tp1.process, daemon=True)
p2.start()
tp2 = TestProcess2(records, lock)
p3 = Process(target=tp2.process, daemon=True)
p3.start()
sleep(15) # run for 15 seconds
使用函数而不是派生自的类multiprocessing.Process
from multiprocessing import Process, Manager, Lock
from time import sleep
import random
class Trade:
def __init__(self, id):
self.exchange = None
self.order_id = id
def testprocess2(trades, lock):
while True:
# lock.acquire()
print("Altering")
for idx in range(len(trades)):
trade = trades[idx]
trade.order_id = 0
# We must tell the managed list that it has been updated!!!:
trades[idx] = trade
# lock.release()
sleep(1)
def testprocess1(trades, lock):
while True:
print("start")
for idx in range(len(trades)):
print(f'index = {idx}, order id = {trades[idx].order_id}')
sleep(1)
def testprocess(trades, lock):
while True:
# lock.acquire()
n = random.randint(0, 9)
print("adding random {}".format(n))
trades.append(Trade(n))
# lock.release()
# print(trades)
sleep(5)
if __name__ == "__main__":
with Manager() as manager:
records = manager.list([Trade(5)])
lock = Lock()
p1 = Process(target=testprocess, args=(records, lock), daemon=True)
p1.start()
p2 = Process(target=testprocess1, args=(records, lock), daemon=True)
p2.start()
p3 = Process(target=testprocess2, args=(records, lock), daemon=True)
p3.start()
sleep(15) # run for 15 seconds