12

所以我正在阅读这篇关于如何在 ZMQ 中为 ( X) PUB/( X)SUB消息传递创建代理/代理的文章。有这样一张漂亮的图片,描绘了建筑应该是什么样子:

数据流:用户码->PUB->XSUB->用户码->XPUB->SUB->用户码; 订阅流程:用户代码 <- PUB <- XSUB <- 用户代码 <- XPUB <- SUB <- 用户代码;

但是当我查看XSUB套接字描述时,我不知道如何通过它转发所有订阅,因为它Outgoing routing strategyN/A

那么如何在 ZeroMQ 中实现(取消)订阅转发,这种转发应用程序的最小用户代码是什么(可以插入简单的发布者和订阅者样本之间的代码)?

4

1 回答 1

18

XPUB 确实接收消息——它接收的唯一消息是来自已连接订阅者的订阅,并且这些消息应该通过 XSUB 按原样向上游转发。

中继消息的最简单方法是使用zmq_proxy

xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)

它将向/从 xpub 和 xsub 中继消息。或者,您可以添加一个 PUB 套接字来监控通过任一方向的流量。

如果您希望中间的用户代码实现额外的路由逻辑,您可以执行以下操作,重新实现内部循环zmq_proxy

def broker(ctx):
    xpub = ctx.socket(zmq.XPUB)
    xpub.bind(xpub_url)
    xsub = ctx.socket(zmq.XSUB)
    xsub.bind(xsub_url)

    poller = zmq.Poller()
    poller.register(xpub, zmq.POLLIN)
    poller.register(xsub, zmq.POLLIN)
    while True:
        events = dict(poller.poll(1000))
        if xpub in events:
            message = xpub.recv_multipart()
            print "[BROKER] subscription message: %r" % message[0]
            xsub.send_multipart(message)
        if xsub in events:
            message = xsub.recv_multipart()
            # print "publishing message: %r" % message
            xpub.send_multipart(message)

        # insert user code here

完整的工作(Python)示例

于 2013-01-29T21:21:12.070 回答