2

我想实现一个轻量级的消息队列代理。它的工作是从 Web 应用程序 (PHP) 接收消息并将它们异步发送到 Message Queue 服务器。使用此代理的原因是 MQ 并不总是可用,有时会滞后甚至关闭,但我想确保消息已传递,并且 Web 应用程序立即返回。

因此,PHP 会将消息发送到在同一主机上运行的 MQ 代理。如果发生崩溃,该代理会将消息保存到 SQLite 以保持持久性。同时,当连接可用时,它会从 SQLite 将消息批量发送到 MQ,并从 SQLite 中删除它们。

现在,据我了解,此服务中有以下组件:

  1. 消息监听器(监听来自 PHP 的消息并将它们写入传入队列)
  2. DB flusher(从传入队列读取消息并将它们保存到数据库;由于 SQLite 单线程)
  3. MQ 连接处理程序(通过重新连接保持与 MQ 服务器的连接在线)
  4. 消息发送者(从 SQlite db 收集消息并将它们发送到 MQ 服务器,然后从 db 中删除它们)

我正在考虑将 Twisted 用于#1(TCPServer),但我在将它与其他非事件驱动的点集成时遇到了问题。直觉告诉我,这些点中的每一个都应该在一个单独的线程中运行,因为它们都是 IO 绑定的并且彼此独立,但我可以轻松地将它们放在一个线程中。尽管如此,除了 Twisted 的主循环之外,我找不到任何关于如何实现此工作线程的好且清晰的(对我而言)示例。

我开始的例子是chatserver.py,它使用 service.Application 和 internet.TCPServer 对象。如果我在创建 TCPServer 服务之前启动自己的线程,它会运行几次,但它会停止并且不再运行。我不确定为什么会发生这种情况,但这可能是因为我没有正确使用 Twisted 线程。

关于如何实现单独的工作线程并保持 Twisted 的任何建议?您有任何替代架构吗?

4

3 回答 3

4

您基本上是在考虑为您的消息传递服务器编写一个临时扩展,其工作是提供您所要求的任何可靠性保证。

相反,也许您应该使用您计划运行此新代理的硬件并在其上运行另一个 MQ 节点。新节点应该负责在其他节点超载或离线时持久化和中继您传递给它的消息。

于 2010-06-01T18:32:36.797 回答
1

也许在 Twisted 中使用单独的线程来绕过阻塞调用并不是最划算的,但有时最不邪恶的解决方案是最好的。这是一个链接,向您展示如何将线程集成到 Twisted:

http://twistedmatrix.com/documents/10.1.0/core/howto/threading.html

有时在紧要关头,易于实施的速度比数小时/数天的研究要快,而这些研究可能都证明是徒劳的。

于 2010-10-31T20:54:51.007 回答
0

这个问题的一个巧妙的解决方案是使用键值存储Redis。它是一个高速持久数据存储,有很多客户端——它有一个 php 和一个 python 客户端(如果你想使用定时/批处理过程来处理消息——它可以节省你创建数据库的时间,还可以处理你的持久性故事. 它在 Cywin/Windows + posix 环境下运行良好。

PHP Redis 客户端在这里

Python 客户端在这里

两者都有一个非常干净和简单的 API。如果您需要,Redis 还提供了发布/订阅机制,尽管如果您要发布到不一致的队列,这听起来价值有限。

于 2010-06-02T08:55:33.537 回答