1

我的用例:

  1. 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息。
  2. Publishers 将在不同的线程中初始化为客户端(连接到端口)。
  3. 在每个线程中发布的数据将是几条消息。
  4. 当订阅者连接时,尽快获取每条消息是很重要的。
  5. 如果订阅者未连接,那么我不想让发布者线程保持阻塞,理想情况下它可以在 1-2 秒左右超时。

慢连接问题:

仅运行 1000 多个线程(发布者)1 或 2 次,我就获得了订阅者中的所有数据。添加几毫秒的睡眠解决了这个问题,所以我有 99.9% 的把握我是众所周知的慢连接器综合症的受害者。然而,就我而言,睡眠解决方案并不是一个好的解决方案,因为发布者的连接时间可能是可变的,我希望尽快将数据发送给订阅者。

我对解决这个问题的想法和实验代码:

我的解决方案基于使用 XPUB recv 方法。使用 XPUB 初始化发布者并将 RCVTIMEO 设置为 1000 毫秒。发布者连接后,我添加了一个recv()调用来检查是否有订阅者。当我收到订阅消息时,我知道连接已经完成,并且我可以发送数据而不会丢失任何数据(除非订阅者发生错误但我不在乎)。

如果我没有收到任何订阅消息,那么在 1000 毫秒recv()内超时并且线程被终止。

这是python(pyzmq)中用于测试此实现的示例代码(对于发布者,我不使用线程,而是使用while循环并同时运行多个发布者),它可以按我的意愿工作:

发布者.py:

import zmq

def main():
    """ main method """

    i = 0
    while True:
        # Prepare context and publisher
        context = zmq.Context()
        publisher = context.socket(zmq.XPUB)
        publisher.connect("tcp://0.0.0.0:5650")
        publisher.setsockopt(zmq.RCVTIMEO, 1000)

        # Waiting for 1000ms to get a subscription
        i = i + 1
        try:
            publisher.recv()
            # Send the message
            publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
        except Exception as e:
            print(e, flush=True)

        # Terminate socket and context
        publisher.close()
        context.term()
        if i >= 10000:
            break

if __name__ == "__main__":
    main()    

订阅者.py:

import zmq

def main():
    """ main method """

    # Prepare our context and subscriber
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    uri = "tcp://0.0.0.0:5650"
    subscriber.bind(uri)
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    print('Subscriber connects to %s' % (uri), flush=True)

    # Receive messages
    i = 0
    while True:
        [topic, data] = subscriber.recv_multipart()
        i = i + 1
        print("%s: %s %s" % (i, topic, data), flush=True)

if __name__ == "__main__":
    main()

我的问题:

解决方案就这么简单吗?如果有订阅者处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速加入者有关)?

4

1 回答 1

1

“解决方案就这么简单吗?”

恰恰相反。对于上面发布的内容,解决方案相对于迄今为止发布的用例要求过于复杂。

a) 鉴于上述要求tcp://,当位于同一主机上属于同一进程的线程之间进行通信时,可以消除与 ISO-OSI-L3 传输类的设置和维护相关的所有成本。而是选择超快速、无堆栈、内存映射的inproc://传输类来避免所有这些低效率。ZeroMQ API v4.0+ 在设置inproc://-TransportClass{ .bind() | .connect() }出现顺序时也没有其他条件的舒适性,因此我们可以享受零拷贝“传输”消息的最大 MEM 映射超低延迟标记(无需移动一个字节的内存数据) - 很酷,不是吗?(除非您需要注入 MITM 协议嗅探,否则请删除tcp://过度杀伤力)

b) 考虑到上述要求,传递几个消息,其中“静态”SUB端订阅所有消息,是对PUB/SUB可扩展正式通信模式原型的极大低效使用。您的代码必须支付所有成本来设置一个新的SUB-instance,然后它会爬行以设置一个有效的连接(通过tcp://-TransportClass' 堆栈,希望在 a 下删除),然后争吵以设置一个新的 TOPIC 过滤器(被它操作在早期版本中的 SUB 端或较新的 ZeroMQ 版本中的 PUB 端——所有这些都是为了接收所有消息而付出的巨大代价——即根本没有过滤)。使用更轻量级的多节点方式可以实现相同的正式服务-PUSH/PULL-在一个节点上。如果没有其他需要任何反向/双向/更复杂的正式沟通,这只是一个PUSH/PULL能够完成所要求的工作。

c) 鉴于上述要求,您的重点似乎是过早地通过连接发送消息而不会丢失消息。在 ZeroMQ 设置中有用于确定这一点的工具,但您不小心使用它们:

  • usingzmq.IMMEDIATE可以使用 AccessNode 的阻塞状态,以防没有现成的连接正在工作(或曾经)
  • 使用返回码 & errno(或zmq.errno()对于 POSIX 不兼容的操作系统/Win32 等)处理可以帮助您的代码检测 & 对在整个跨度的“自治代理网络”中发生的任何和所有特定情况作出反应它在中的生命周期(无论代理实际上是“物理上”分布还是位于同一位置,就像这里的情况一样)。不失控是这里的核心责任。什么是控制代码,它在失去控制状态下自锁,甚至无法控制自己;)?

d) 永远不要使用阻塞形式的{ .recv() | .send() | .poll() | ... }-methods。教科书示例是专业信号/消息传递元平面实现应该是什么样子的反模式。确实从不 - 参考。第 5) 项。

e) 更好地重复使用Context()-instance,而不是像上面所描绘的那样使其成为消耗品/一次性用品。线程可以自由地共享一个预先实例化的Context()引擎,避免下一个巨大的重复性附加开销成本,如果Context()为每个分叉重新实例化一个消耗品/一次性用品,只是一个短暂的对等客户端线程。

f)如果有人知道更好的解决方案,请随时通知我们:o)

评论中的问题

a)
订阅者将在另一台机器上,所以我认为tcp://是解决方案。*

当然,这里是NP。{ pgm:// | epgm:// | tipc:// }- 如果进一步进入更高的性能水平方向,运输在这里可能会很有趣

b)
订阅者将通过XPUB套接字将消息转发给其他订阅者。PUSH/PULL可以工作,但如果我想将这些订阅及其过滤器传递给初始发布者并在源头过滤一些消息,我必须使用PUB/SUB模式。

好吧,O / P中没有提到。XPUBs/ s 的任何分层都XSUB可以正常工作,问题出在连接管理级别

c)
澄清一下,只有当有订阅者时,不丢失消息才重要。你能再解释一下这部分吗?

当然,如果在 RTO 连接的链接上没有可用的订阅者,准备好“通过网络”立即交付,则永远无法交付任何消息(并且可能会默默地丢弃,这是您试图与之抗争的,不是吗? ?)。这就是zmq.IMMEDIATE可以通过调用 - 方法来管理的.setsockopt()

于 2021-01-01T10:40:00.713 回答