1

对 zeromq 来说相当新,并试图让一个基本的 pub/sub 工作。当我运行以下命令(在发布之前开始订阅)时,发布者完成但订阅者挂起并没有收到所有消息 - 为什么?

我认为套接字正在关闭但消息已发送?有没有办法确保收到所有消息?

出版商:

import zmq
import random
import time
import tnetstring

context=zmq.Context()
socket=context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

y=0
for x in xrange(5000):
    st = random.randrange(1,10) 
    data = []
    data.append(random.randrange(1,100000))
    data.append(int(time.time()))
    data.append(random.uniform(1.0,10.0))

    s = tnetstring.dumps(data)
    print 'Sending ...%d %s' % (st,s)

    socket.send("%d %s" % (st,s))
    print "Messages sent: %d" % x
    y+=1

print '*** SERVER FINISHED. # MESSAGES SENT = ' + str(y)

订户:-

import sys
import zmq
import tnetstring

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.connect("tcp://localhost:5556")
filter = "" # get all messages

socket.setsockopt(zmq.SUBSCRIBE, filter)

x=0
while True:
    topic,data = socket.recv().split()
    print "Topic: %s, Data = %s. Total # Messages = %d" % (topic,data,x)
    x+=1
4

2 回答 2

0

在您的特定情况下,您可能希望使用 socket.close() 和 context.term() 进行调查。它将阻塞,直到所有消息都发送完毕。你也有一个缓慢的加入者的问题。您可以在绑定之后、开始发布之前添加睡眠。这在测试用例中有效,但您会想真正了解解决方案与创可贴的区别。

您需要将 PUB/SUB 模式想象成收音机。发送者和接收者都是异步的。即使没有人在收听,Publisher 也会继续发送。订阅者只有在监听时才会接收数据。如果中途网络宕机,数据就会丢失。

您需要了解这一点才能设计您的消息。例如,如果您将消息设计为“幂等”,那么丢失数据并不重要。这方面的一个例子是状态类型消息。如果您有任何以前的状态都没有关系。最新的是正确的,消息丢失无关紧要。这种方法的好处是您最终会得到一个更健壮和高性能的系统。缺点是您无法以这种方式设计消息。

您的示例包括一种不需要丢失的消息。另一种类型的消息将是事务性的。例如,如果您只是发送了系统中更改内容的增量,您将无法丢失这些消息。数据库复制通常以这种方式管理,这就是数据库复制通常如此脆弱的原因。要尝试提供保证,您需要做几件事。一件事是添加持久缓存。发送的每条消息都需要记录在持久缓存中。每条消息都需要分配一个唯一的 id(最好是一个序列),以便客户端可以确定他们是否缺少消息。需要为客户端添加第二个套接字 (ROUTER/REQ) 以单独请求丢失的消息。或者,您可以只使用辅助套接字请求通过 PUB/SUB 重新发送。然后客户端都将再次收到消息(这适用于多播版本)。客户会忽略他们已经看到的消息。注意:这遵循 ZeroMQ 指南中的 MAJORDOMO 模式。

另一种方法是使用 ROUTER/DEALER 套接字创建您自己的代理。当 ROUTER 套接字看到每个 DEALER 连接时,它将存储其 ID。当 ROUTER 需要发送数据时,它会遍历所有客户端 ID 并发布消息。每条消息都应该包含一个序列,以便客户端可以知道要请求哪些丢失的消息。注意:这是对来自linkedin 的Kafka 的一种重新实现。

于 2013-06-08T06:26:40.813 回答
0

在 ZeroMQ 中,客户端和服务器总是尝试重新连接;如果另一方断开连接,它们就不会掉线(因为在许多情况下,如果另一方再次出现,您希望它们恢复通话)。因此,在您的测试代码中,客户端只会等到服务器再次开始发送消息,除非您recv()在某个时候停止发送消息。

于 2013-05-20T09:40:25.147 回答