2

所以我一直在努力解决这个让我发疯的泡菜错误。我有以下带有以下代码的主引擎类:

import eventlet
import socketio
import multiprocessing
from multiprocessing import Queue
from multi import SIOSerever

class masterEngine:

    if __name__ == '__main__': 
            
        serverObj = SIOSerever()

        try:
            receiveData = multiprocessing.Process(target=serverObj.run)
            receiveData.start()

            receiveProcess = multiprocessing.Process(target=serverObj.fetchFromQueue)
            receiveProcess.start()

            receiveData.join()
            receiveProcess.join()
            
        except Exception as error:
            print(error)

我还有另一个名为 multi 的文件,其运行如下:

import multiprocessing
from multiprocessing import Queue
import eventlet
import socketio

class SIOSerever:

  def __init__(self):
    self.cycletimeQueue = Queue()
    self.sio = socketio.Server(cors_allowed_origins='*',logger=False)
    self.app = socketio.WSGIApp(self.sio, static_files={'/': 'index.html',})
    self.ws_server = eventlet.listen(('0.0.0.0', 5000))

    @self.sio.on('production')
    def p_message(sid, message):
      self.cycletimeQueue.put(message)
      print("I logged : "+str(message))

  def run(self):
    eventlet.wsgi.server(self.ws_server, self.app)

  def fetchFromQueue(self):
    while True:
      cycle = self.cycletimeQueue.get()
      print(cycle)

如您所见,我可以尝试创建两个我想独立运行的 def run 和 fetchFromQueue 进程。

我的 run 函数启动 python-socket 服务器,我从 html 网页发送一些数据到该服务器(无需多处理即可完美运行)。然后我尝试将接收到的数据推送到队列,以便我的其他函数可以检索它并使用接收到的数据。

我需要对从套接字接收到的数据执行一组耗时操作,这就是我将其全部推入队列的原因。

在运行主引擎类时,我收到以下信息:

Can't pickle <class 'threading.Thread'>: it's not the same object as threading.Thread
I ended!
[Finished in 0.5s]

你能帮我解决我做错了什么吗?

4

1 回答 1

0

从多处理编程指南

将资源显式传递给子进程

在使用 fork start 方法的 Unix 上,子进程可以使用在使用全局资源的父进程中创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还确保只要子进程还活着,该对象就不会在父进程中被垃圾收集。如果在父进程中对对象进行垃圾收集时释放了某些资源,这可能很重要。

因此,我通过删除所有不必要的内容来稍微修改您的示例,但展示了一种将共享队列显式传递给使用它的所有进程的方法:

import multiprocessing

MAX = 5

class SIOSerever:

  def __init__(self, queue):
    self.cycletimeQueue = queue

  def run(self):
    for i in range(MAX):
      self.cycletimeQueue.put(i)

  @staticmethod
  def fetchFromQueue(cycletimeQueue):
    while True:
      cycle = cycletimeQueue.get()
      print(cycle)
      if cycle >= MAX - 1:
        break


def start_server(queue):
    server = SIOSerever(queue)
    server.run()


if __name__ == '__main__':

    try:
        queue = multiprocessing.Queue()
        receiveData = multiprocessing.Process(target=start_server, args=(queue,))
        receiveData.start()

        receiveProcess = multiprocessing.Process(target=SIOSerever.fetchFromQueue, args=(queue,))
        receiveProcess.start()

        receiveData.join()
        receiveProcess.join()

    except Exception as error:
        print(error)
0
1
...
于 2020-12-17T14:13:21.313 回答