14

我注意到 zeromq PUB 套接字在连接时会缓冲所有传出数据,例如

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

子绑定在这些消息之后,它们应该被丢弃,因为如果没有人连接到它,PUB 应该丢弃消息。但它不是丢弃消息,而是缓冲所有消息。

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

如您所见,那些“不应丢弃消息”由套接字缓冲,一旦连接,它将它们刷新到 SUB 套接字。如果我在 PUB 套接字上绑定,并在 SUB 套接字上连接,那么它可以正常工作。

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

你只能看到输出

'hi'

这种奇怪的行为导致了一个问题,它缓冲了一个连接套接字上的所有数据,我有两台服务器,服务器A向服务器B发布数据

Server A -- publish --> Server B

如果服务器 B 上线,它工作正常。但是如果我启动了服务器 A 而没有启动服务器 B 怎么办?

结果,服务器 A 上的连接 PUB 套接字保留了所有这些数据,内存使用率越来越高。

问题来了,这种行为是错误还是功能?如果是功能,我在哪里可以找到提到此行为的文档?以及如何停止连接的 PUB 套接字缓冲所有数据?

谢谢。

4

6 回答 6

7

套接字是阻塞还是丢弃消息取决于ZMQ::Socket 文档中描述的套接字类型(以下重点是我的):

ZMQ::HWM:检索高水位标记

ZMQ::HWM 选项应检索指定套接字的高水位线。高水位线是对未完成消息的最大数量的硬限制,0MQ 应在内存中为指定套接字正在与之通信的任何单个对等方排队。

如果已达到此限制,则套接字应进入异常状态,并且根据套接字类型,0MQ 应采取适当的措施,例如阻止或丢弃已发送的消息。请参阅 ZMQ::Socket 中的各个套接字描述,了解对每种套接字类型采取的确切操作的详细信息。

默认的 ZMQ::HWM 值为零表示“无限制”。

您可以通过查看套接字类型的文档来查看它是否会阻塞或丢弃,该套接字类型ZMQ::HWM option action将为BlockDrop

的动作ZMQ::PUBDrop,所以如果它没有下降,你应该检查 HWM(高水位线)值并注意警告默认 ZMQ::HWM 值为零表示“无限制”,这意味着它不会进入异常状态直到系统内存不足(此时我不知道它的行为方式)。

于 2012-01-22T03:24:01.870 回答
4

我觉得这种行为是 zmq_connect() 的语义。即:当 zmq_connect() 返回成功时,连接在概念上已建立,因此您的连接 PUB 开始排队消息而不是 drop

以下摘自“ ZMQ 指南”是对此的提示:

理论上,对于 ØMQ 套接字,哪一端连接,哪一端绑定都没有关系。但是对于 PUB-SUB 套接字,如果绑定 SUB 套接字并连接 PUB 套接字,则 SUB 套接字可能会收到旧消息,即在 SUB 启动之前发送的消息。这是绑定/连接工作方式的产物。如果可以的话,最好绑定PUB并连接SUB。

zmq_connect () 中的以下部分有一些提示,如下所示:

与传统插座的主要区别

一般来说,传统的套接字为面向连接的可靠字节流 (SOCK_STREAM) 或无连接不可靠数据报 (SOCK_DGRAM) 提供同步接口。相比之下,ØMQ 套接字呈现了异步消息队列的抽象,其确切的队列语义取决于所使用的套接字类型。传统套接字传输字节流或离散数据报,ØMQ 套接字传输离散消息。

ØMQ 套接字是异步的,这意味着物理连接建立和拆除、重新连接和有效传递的时间对用户是透明的,并由ØMQ 自己组织。此外,如果对等方无法接收消息,则消息可能会排队。

于 2012-05-04T09:48:54.210 回答
1

他们在套接字上设置 HWM 选项。

于 2012-01-21T23:28:39.800 回答
0

所以 bind() 和 connect() 会导致两种不同的行为。你为什么不直接选择你喜欢的那个(看起来像 bind())并使用它呢?

实际上,ZeroMQ 的一个特性通常是缓冲传出消息,直到建立连接。

于 2012-01-21T16:41:22.003 回答
0

您应该能够使用 pub 套接字的 hwm 设置在套接字中设置高水位线。它允许您定义保留多少消息。

于 2012-01-21T20:45:46.427 回答
0

这是一个可能会有所帮助的黑客...

将您设置ZMQ::HWM为固定数字,例如 10。连接后,recv在循环中调用订阅者套接字的方法,直到它丢弃所有缓冲的消息,然后才开始您的主接收循环。

于 2013-09-22T01:06:21.497 回答