1

我试图在 python 中实现 Lmax。我试图在 4 个进程中处理数据

import disruptor  
import multiprocessing
import random

if __name__ == '__main__':

    cb = disruptor.CircularBuffer(5)

    def receiveWriter():
        while(True):
            n = random.randint(5,20)
            cb.receive(n)

    def ReplicatorReader():
        while(True):
            cb.replicator()

    def journalerReader():
        while(True):
            cb.journaler()

    def unmarshallerReader():
        while(True):
            cb.unmarshaller()

    def consumeReader():
        while(True):
            print(cb.consume())
    


  
    p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
    p1.start()

    p0 = multiprocessing.Process(name="p0",target=receiveWriter)
    p0.start()
    
    p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
    p1.start()

    p2 = multiprocessing.Process(name="p2",target=journalerReader)
    p2.start()

    p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
    p3.start()

    p4 = multiprocessing.Process(name="p4",target=consumeReader)
    p4.start()
   

但我的代码中出现此错误:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
    exitcode = _main(fd, parent_sentinel)
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
  File "C:\Program Files\Python39\lib\multiprocessing\spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\\python\\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\\python\\RunDisruptor.py'>

您的第一个问题是调用的目标不能Processif __name__ == '__main__':块内。但:

正如我在您之前的一篇文章中提到的,我看到您可以CircularBuffer跨多个进程共享一个实例的唯一方法是实现一个托管类,令人惊讶的是,这并不难做到。但是当您创建托管类并创建该类的实例时,您所拥有的实际上是对该对象的代理引用。这有两个含义:

  1. 每个方法调用更像是对由您将启动的管理器创建的特殊服务器进程的远程过程调用,因此比本地方法调用具有更多开销。
  2. 如果打印引用,__str__则不会调用该类的方法;您将打印代理指针的表示。您可能应该将方法重命名__str__为类似的名称dump,并在需要实例表示时显式调用它。

您还应该明确地等待您正在创建的进程完成,以便管理器服务不会过早关闭,这意味着每个进程都应该分配给一个唯一的变量并具有唯一的名称。

import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random

class CircularBufferManager(BaseManager):
    pass

def receiveWriter(cb):
    while(True):
        n = random.randint(5,20)
        cb.receive(n)

def ReplicatorReader(cb):
    while(True):
        cb.replicator()

def journalerReader(cb):
    while(True):
        cb.journaler()

def unmarshallerReader(cb):
    while(True):
        cb.unmarshaller()

def consumeReader(cb):
    while(True):
        print(cb.consume())

if __name__ == '__main__':
    # Create managed class
    CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
    # create and start manager:
    with CircularBufferManager() as manager:
        cb = manager.CircularBuffer(5)

        p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
        p1.start()

        p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
        p0.start()

        p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
        p1a.start()

        p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
        p2.start()

        p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
        p3.start()

        p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
        p4.start()

        p1.join()
        p0.join()
        p1a.join()
        p2.join()
        p3.join()
        p4.join()
4

1 回答 1

1

您的第一个问题是调用的目标不能Processif __name__ == '__main__':块内。但:

正如我在您之前的一篇文章中提到的,我看到您可以CircularBuffer跨多个进程共享一个实例的唯一方法是实现一个托管类,令人惊讶的是,这并不难做到。但是当您创建托管类并创建该类的实例时,您所拥有的实际上是对该对象的代理引用。这有两个含义:

  1. 每个方法调用更像是对由您将启动的管理器创建的特殊服务器进程的远程过程调用,因此比本地方法调用具有更多开销。
  2. 如果打印引用,__str__则不会调用该类的方法;您将打印代理指针的表示。您可能应该将方法重命名__str__为类似的名称dump,并在需要实例表示时显式调用它。

您还应该明确地等待您正在创建的进程完成,以便管理器服务不会过早关闭,这意味着每个进程都应该分配给一个唯一的变量并具有一个唯一的名称。

import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random

class CircularBufferManager(BaseManager):
    pass

def receiveWriter(cb):
    while(True):
        n = random.randint(5,20)
        cb.receive(n)

def ReplicatorReader(cb):
    while(True):
        cb.replicator()

def journalerReader(cb):
    while(True):
        cb.journaler()

def unmarshallerReader(cb):
    while(True):
        cb.unmarshaller()

def consumeReader(cb):
    while(True):
        print(cb.consume())

if __name__ == '__main__':
    # Create managed class
    CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
    # create and start manager:
    with CircularBufferManager() as manager:
        cb = manager.CircularBuffer(5)

        p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
        p1.start()

        p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
        p0.start()

        p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
        p1a.start()

        p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
        p2.start()

        p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
        p3.start()

        p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
        p4.start()

        p1.join()
        p0.join()
        p1a.join()
        p2.join()
        p3.join()
        p4.join()
于 2021-10-11T13:06:55.923 回答