我的用例:
- 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息。
- Publishers 将在不同的线程中初始化为客户端(连接到端口)。
- 在每个线程中发布的数据将是几条消息。
- 当订阅者连接时,尽快获取每条消息是很重要的。
- 如果订阅者未连接,那么我不想让发布者线程保持阻塞,理想情况下它可以在 1-2 秒左右超时。
慢连接问题:
仅运行 1000 多个线程(发布者)1 或 2 次,我就获得了订阅者中的所有数据。添加几毫秒的睡眠解决了这个问题,所以我有 99.9% 的把握我是众所周知的慢连接器综合症的受害者。然而,就我而言,睡眠解决方案并不是一个好的解决方案,因为发布者的连接时间可能是可变的,我希望尽快将数据发送给订阅者。
我对解决这个问题的想法和实验代码:
我的解决方案基于使用 XPUB recv 方法。使用 XPUB 初始化发布者并将 RCVTIMEO 设置为 1000 毫秒。发布者连接后,我添加了一个recv()
调用来检查是否有订阅者。当我收到订阅消息时,我知道连接已经完成,并且我可以发送数据而不会丢失任何数据(除非订阅者发生错误但我不在乎)。
如果我没有收到任何订阅消息,那么在 1000 毫秒recv()
内超时并且线程被终止。
这是python(pyzmq)中用于测试此实现的示例代码(对于发布者,我不使用线程,而是使用while循环并同时运行多个发布者),它可以按我的意愿工作:
发布者.py:
import zmq
def main():
""" main method """
i = 0
while True:
# Prepare context and publisher
context = zmq.Context()
publisher = context.socket(zmq.XPUB)
publisher.connect("tcp://0.0.0.0:5650")
publisher.setsockopt(zmq.RCVTIMEO, 1000)
# Waiting for 1000ms to get a subscription
i = i + 1
try:
publisher.recv()
# Send the message
publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
except Exception as e:
print(e, flush=True)
# Terminate socket and context
publisher.close()
context.term()
if i >= 10000:
break
if __name__ == "__main__":
main()
订阅者.py:
import zmq
def main():
""" main method """
# Prepare our context and subscriber
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
uri = "tcp://0.0.0.0:5650"
subscriber.bind(uri)
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
print('Subscriber connects to %s' % (uri), flush=True)
# Receive messages
i = 0
while True:
[topic, data] = subscriber.recv_multipart()
i = i + 1
print("%s: %s %s" % (i, topic, data), flush=True)
if __name__ == "__main__":
main()
我的问题:
解决方案就这么简单吗?如果有订阅者处于活动状态,我是否遗漏了任何会导致数据丢失的内容(与慢速加入者有关)?