我想实现一个轻量级的消息队列代理。它的工作是从 Web 应用程序 (PHP) 接收消息并将它们异步发送到 Message Queue 服务器。使用此代理的原因是 MQ 并不总是可用,有时会滞后甚至关闭,但我想确保消息已传递,并且 Web 应用程序立即返回。
因此,PHP 会将消息发送到在同一主机上运行的 MQ 代理。如果发生崩溃,该代理会将消息保存到 SQLite 以保持持久性。同时,当连接可用时,它会从 SQLite 将消息批量发送到 MQ,并从 SQLite 中删除它们。
现在,据我了解,此服务中有以下组件:
- 消息监听器(监听来自 PHP 的消息并将它们写入传入队列)
- DB flusher(从传入队列读取消息并将它们保存到数据库;由于 SQLite 单线程)
- MQ 连接处理程序(通过重新连接保持与 MQ 服务器的连接在线)
- 消息发送者(从 SQlite db 收集消息并将它们发送到 MQ 服务器,然后从 db 中删除它们)
我正在考虑将 Twisted 用于#1(TCPServer),但我在将它与其他非事件驱动的点集成时遇到了问题。直觉告诉我,这些点中的每一个都应该在一个单独的线程中运行,因为它们都是 IO 绑定的并且彼此独立,但我可以轻松地将它们放在一个线程中。尽管如此,除了 Twisted 的主循环之外,我找不到任何关于如何实现此工作线程的好且清晰的(对我而言)示例。
我开始的例子是chatserver.py,它使用 service.Application 和 internet.TCPServer 对象。如果我在创建 TCPServer 服务之前启动自己的线程,它会运行几次,但它会停止并且不再运行。我不确定为什么会发生这种情况,但这可能是因为我没有正确使用 Twisted 线程。
关于如何实现单独的工作线程并保持 Twisted 的任何建议?您有任何替代架构吗?