所以我正在阅读这篇关于如何在 ZMQ 中为 ( X
) PUB
/( X
)SUB
消息传递创建代理/代理的文章。有这样一张漂亮的图片,描绘了建筑应该是什么样子:
但是当我查看XSUB
套接字描述时,我不知道如何通过它转发所有订阅,因为它Outgoing routing strategy
是 N/A
那么如何在 ZeroMQ 中实现(取消)订阅转发,这种转发应用程序的最小用户代码是什么(可以插入简单的发布者和订阅者样本之间的代码)?
XPUB 确实接收消息——它接收的唯一消息是来自已连接订阅者的订阅,并且这些消息应该通过 XSUB 按原样向上游转发。
中继消息的最简单方法是使用zmq_proxy:
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)
它将向/从 xpub 和 xsub 中继消息。或者,您可以添加一个 PUB 套接字来监控通过任一方向的流量。
如果您希望中间的用户代码实现额外的路由逻辑,您可以执行以下操作,重新实现内部循环zmq_proxy
:
def broker(ctx):
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
poller = zmq.Poller()
poller.register(xpub, zmq.POLLIN)
poller.register(xsub, zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if xpub in events:
message = xpub.recv_multipart()
print "[BROKER] subscription message: %r" % message[0]
xsub.send_multipart(message)
if xsub in events:
message = xsub.recv_multipart()
# print "publishing message: %r" % message
xpub.send_multipart(message)
# insert user code here