0

我是 python 多处理的新手,关于以下代码的背景。我正在尝试创建三个进程,一个将元素添加到列表中,一个用于修改列表中的元素,一个用于打印列表。

理想情况下,这三个进程使用共享内存中的同一个列表,使用manager.

我面临的问题testprocess2是无法将值设置为0,基本上,它无法更改列表。

class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class testprocess2(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                self.trades[idx].order_id = 0
            # lock.release()
            sleep(1)


class testprocess1(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(self.trades[idx].order_id)

            sleep(1)


class testprocess(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = testprocess(records, lock)
        p1.start()

        p2 = testprocess1(records, lock)
        p2.start()

        p3 = testprocess2(records, lock)
        p3.start()

        p1.join()
        p2.join()
        p3.join()
4

1 回答 1

0

严格来说,您的托管列表不在共享内存中,了解正在发生的事情非常重要保存您的实例的实际列表Trade 驻留在执行Manager()调用时创建的进程中。然后执行records = manager.list([Trade(5)])时,records不是对该列表的直接引用,因为正如我所说,我们不处理共享内存。相反,它是一个特殊的代理对象,它实现了与列表相同的方法,但是当你,例如,调用append在这个代理对象上,它接受您尝试附加的参数并将其序列化,并通过套接字或管道将其传输到管理器的进程,在该进程中它被反序列化并附加到实际列表中。简而言之,对代理对象的操作变成了远程方法调用。

现在解决你的问题。您正在尝试order_id使用以下语句重置属性:

self.trades[idx].order_id = 0

由于我们正在通过代理对象处理远程列表,因此不幸的是,上述语句等效于:

trade = self.trades[idx] # fetch object from the remote list
trade.order_id = 0 # reset the order_id to 0 on the local copy

缺少的是使用新更新的交易对象更新列表:

self.trades[idx] = trade

因此,您的单个更新语句确实需要替换为上述 3 语句序列。

我还冒昧地以多种方式修改了您的代码。

  1. Python 代码的PEP8 样式指南建议将类名大写。
  2. 由于您的所有流程类在构造方式上都是相同的(即具有相同__init__的方法),因此我创建了一个抽象基类,TestProcess这些类继承自该基类。他们所要做的就是提供一种run方法。
  3. 我已经制作了这些进程类守护程序类。这意味着它们将在主进程终止时自动终止。我这样做是为了演示目的,这样程序就不会无休止地循环。主进程将在 15 秒后终止。
  4. 您不需要将交易锁定参数传递给类的__init__方法Process。例如,如果您不是从中派生类,Process而只是想让新创建的进程运行一个foo接受参数tradeslock的函数,那么您可以指定p1 = Process(target=foo, args=(trades, lock)). 这是args参数的真正目的,即与目标参数一起使用。有关详细信息,请参阅threading.Thread类文档。实际上,我实际上认为从中派生类的价值很小multiprocessing.Process(不这样做会有更好的重用机会)。但既然你这样做了,你已经在你的__init__方法设置实例属性self.trades和,当您调用方法隐式调用您的方法self.lock时将使用该属性。您无需再做任何事情。请参阅最后的两个附加代码示例。runstart
from multiprocessing import Process, Manager, Lock
from time import sleep
import random
from abc import ABC, abstractmethod


class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class TestProcess(Process, ABC):
    def __init__(self, trades, lock):
        Process.__init__(self, daemon=True)
        self.trades = trades
        self.lock = lock

    @abstractmethod
    def run():
        pass

class TestProcess2(TestProcess):
    def run(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                trade = self.trades[idx]
                trade.order_id = 0
                # We must tell the managed list that it has been updated!!!:
                self.trades[idx] = trade
            # lock.release()
            sleep(1)


class TestProcess1(TestProcess):
    def run(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(f'index = {idx}, order id = {self.trades[idx].order_id}')

            sleep(1)


class TestProcess(TestProcess):
    def run(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = TestProcess(records, lock)
        p1.start()

        p2 = TestProcess1(records, lock)
        p2.start()

        p3 = TestProcess2(records, lock)
        p3.start()

        sleep(15) # run for 15 seconds

使用非派生类multiprocessing.Process

from multiprocessing import Process, Manager, Lock
from time import sleep
import random
from abc import ABC, abstractmethod


class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class TestProcess(ABC):
    def __init__(self, trades, lock):
        self.trades = trades
        self.lock = lock

    @abstractmethod
    def process():
        pass

class TestProcess2(TestProcess):
    def process(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                trade = self.trades[idx]
                trade.order_id = 0
                # We must tell the managed list that it has been updated!!!:
                self.trades[idx] = trade
            # lock.release()
            sleep(1)


class TestProcess1(TestProcess):
    def process(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(f'index = {idx}, order id = {self.trades[idx].order_id}')

            sleep(1)


class TestProcess(TestProcess):
    def process(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        tp = TestProcess(records, lock)
        p1 = Process(target=tp.process, daemon=True)
        p1.start()

        tp1 = TestProcess1(records, lock)
        p2 = Process(target=tp1.process, daemon=True)
        p2.start()

        tp2 = TestProcess2(records, lock)
        p3 = Process(target=tp2.process, daemon=True)
        p3.start()

        sleep(15) # run for 15 seconds

使用函数而不是派生自的类multiprocessing.Process

from multiprocessing import Process, Manager, Lock
from time import sleep
import random


class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


def testprocess2(trades, lock):
    while True:
        # lock.acquire()
        print("Altering")
        for idx in range(len(trades)):
            trade = trades[idx]
            trade.order_id = 0
            # We must tell the managed list that it has been updated!!!:
            trades[idx] = trade
        # lock.release()
        sleep(1)


def testprocess1(trades, lock):
    while True:
        print("start")
        for idx in range(len(trades)):
            print(f'index = {idx}, order id = {trades[idx].order_id}')

        sleep(1)


def testprocess(trades, lock):
    while True:
        # lock.acquire()
        n = random.randint(0, 9)
        print("adding random {}".format(n))
        trades.append(Trade(n))
        # lock.release()
        # print(trades)
        sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = Process(target=testprocess, args=(records, lock), daemon=True)
        p1.start()

        p2 = Process(target=testprocess1, args=(records, lock), daemon=True)
        p2.start()

        p3 = Process(target=testprocess2, args=(records, lock), daemon=True)
        p3.start()

        sleep(15) # run for 15 seconds
于 2022-02-20T12:31:46.280 回答