3

我有个主意。编写一个基于 WebSocket 的 RPC,它将根据以下场景处理消息。

  1. 客户端连接到 WS(Web 套接字)服务器
  2. 客户端向 WS 服务器发送消息
  3. WS 服务器将消息放入传入队列(可以是 multiprocessing.Queue 或 RabbitMQ 队列)
  4. 进程池中的一名工作人员拿起消息进行处理
  5. 正在处理消息(可能非常快或非常慢 - 它与 WS 服务器无关)
  6. 消息处理完毕后,将处理结果推送到传出队列
  7. WS 服务器从队列中弹出结果并发送给客户端

注意:关键是 WS 服务器应该是非阻塞的,并且只负责:

  • 连接接受
  • 从客户端获取消息并将它们放入传入队列
  • 从传出队列中弹出消息并将它们发送回客户端

注意2:以某种方式存储客户端标识符并将其与来自客户端的消息一起传递可能是个好主意

注意3:由于消息来回排队,简单消息处理的速度(例如获取消息作为输入并作为结果将其推回)会变慢,这是完全可以的目标是能够以与处理快速消息相同的代码风格在池中运行处理器昂贵的操作(粗略的非实际示例:几个嵌套的“for”循环)。即从输入队列中弹出消息以及某种客户端标识符,对其进行处理(可能需要一段时间)并将处理结果与客户端 ID 一起推送到输出队列。

问题:

  • 在 TornadoWeb 中,如果我有一个队列(多处理或 Rabit),我怎样才能让 Tornado 的 IOLoop 在该队列中有新项目时触发一些回调?如果有的话,你能帮我导航到一些现有的实现吗?
  • 这种设计有现成的实现吗?(不一定是 Tornado)
  • 也许我应该使用另一种语言(不是 python)来实现这样的设计?

致谢:

  • 不欢迎使用 REST 和 WSGI 来实现我的目标
  • 诸如“这是我通过谷歌搜索 2 秒找到的代码的链接。它有一些来自龙卷风和多处理的进口。我不确定它是做什么的,但我 99% 确信它正是你需要的”也不受欢迎
  • 使用异步库而不是普通阻塞库的建议是...... :)
4

1 回答 1

1

TornadoIOLoop允许您通过文件描述符处理来自任何文件对象的事件,因此您可以尝试以下操作:

  • 通过multiprocessing.Pipe
  • 调用add_handler每个管道的父端(使用连接的fileno()
  • 让工作人员每次将某些内容放入输出队列时都会写入一些随机垃圾,无论那是否multiprocessing.Queue属于任何 MQ。
  • 在事件处理程序中处理来自工作人员的答案
于 2012-08-16T19:45:55.833 回答