我想在我的服务器之间建立一个基于事件的系统。例如,当包装我的数据库逻辑的服务器更改状态时,我希望它通知我的其他服务器。发布/订阅设计似乎很适合这个,我听说过有关 ZeroRPC 的好消息。
有些人提到使用 zerorpc 流来完成发布/订阅,但是对我来说,使用流来触发事件如何工作并不明显。
我想在我的服务器之间建立一个基于事件的系统。例如,当包装我的数据库逻辑的服务器更改状态时,我希望它通知我的其他服务器。发布/订阅设计似乎很适合这个,我听说过有关 ZeroRPC 的好消息。
有些人提到使用 zerorpc 流来完成发布/订阅,但是对我来说,使用流来触发事件如何工作并不明显。
在 dotCloud,我们使用了大量的 pub/sub 槽 zerorpc 流。让我描述一下我们的做法。
我们公开了一个用@zerorpc.stream 装饰的流方法。此方法在调用时会将 gevent.queue 添加到集合中。然后该方法将永远循环,产生到达队列的每条消息。当此方法终止时(因为客户端已断开连接),队列将从集合中移除。
要发布,只需将要发布的消息发布到集合中注册的每个队列上。这时你必须决定你想对慢消费者做什么(断开他们,为他们排队到一定的限制和/或丢弃新消息)。
class MyService(object):
def __init__(self):
self._subscribers = set()
@zerorpc.stream
def subscribe(self):
try:
queue = gevent.queue.Queue()
self._subscribers.add(queue)
for msg in queue:
yield msg
finally:
self._subscribers.remove(queue)
subscribe 方法只是将事件队列添加到集合中。然后永远使用队列,直到: - 队列由 StopIteration 消息结束(参见 gevent.queue.Queue 文档) - 运行订阅功能的 greenlet 被杀死(通常是因为客户端断开连接)
在这两种情况下,都将执行 finally 语句,并将队列从订阅者列表中删除。
请注意,此时可以限制队列的大小:...Queue(maxsize=42)
.
class MyService(object):
[...]
def _publish(self, msg):
for queue in self._subscribers:
if queue.size < 42:
queue.put(msg)
调用此方法发布消息。它将遍历所有订阅者队列以将消息放入其中。在我的示例中,如果队列达到特定大小,我将丢弃该消息。但是你想在那里应用什么样的模式是没有限制的。
您可以将订阅者的 greenlet 实例存储在集合中,然后在队列已满时将其终止,从而有效地断开慢速客户端(您甚至可以尝试发送消息通知客户端太慢)。您还可以等待所有消费者并行处理消息,然后从 _publish 等返回。朋友,我的朋友没有限制!
希望有帮助!
ZeroRPC 现在有一套完全不同的发布者/订阅者功能,它可以在网络上完美运行!
您可能有兴趣阅读 ZeroRPC 测试以获取有关如何使用它的更多提示,在本例中为 Publisher 和 Subscriber 类。这是测试。
此外,ØMQ 文档中有很多关于发布者/订阅者模式的好信息等等。你可以在这里找到它。