2

当我从套接字发送数据然后立即关闭它(不更改默认设置)时,小数据大小会成功发送,但大数据(> 2MB)则不会。

以下是我设置接收器的方式(最后在 recv() 上阻塞):

In [1]: import zmq

In [2]: ctx = zmq.Context()

In [3]: socket = ctx.socket(zmq.PULL)

In [4]: socket.connect("ipc://@foo")

In [5]: msg = socket.recv()

然后是发件人:

In [1]: import zmq

In [2]: ctx = zmq.Context()

In [3]: socket = ctx.socket(zmq.PUSH)

In [4]: socket.bind("ipc://@foo")

In [5]: arr = bytearray([1]*100)

In [6]: len(arr)
Out[6]: 100

In [7]: socket.send(arr); socket.close()

接收器得到数据就好了:

In [5]: msg = socket.recv()

In [6]: len(msg)
Out[6]: 100

但是,如果我使用更大的消息,例如arr = bytearray([1]*int(2e6)),那么接收器会一直阻塞,等待数据。

更改 LINGER 设置似乎没有任何区别(我相信无论如何默认为无限等待)。

在发送方的发送和关闭之间添加 sleep(1) socket.send(arr); time.sleep(1); socket.close()可以解决问题:接收方正确获取数据。

如果 LINGER 默认为 -1,为什么这在没有睡眠的情况下不起作用?在处理大量数据时,发送然后立即关闭套接字的正确方法是什么?

4

1 回答 1

0

它挂起是因为它没有收到任何消息(您过早关闭了推送器)。如果您打开一个新PUSH套接字并发送一条短消息,您会看到PULL接收器工作正常并且仍在侦听新消息。

ZeroMQ 常见问题解答中,您可以阅读:

如何刷新 ZeroMQ 套接字队列中的所有消息?

没有用于从消息队列中刷新特定消息或所有消息的显式命令。您可以将 ZMQ_LINGER 设置为 0 并关闭套接字以丢弃任何未发送的消息。

因此,基本上,如果您想确保消息已发送,那么您需要在节点之间实现同步方法(即:使用REQ-REP)。

于 2015-07-19T13:49:37.487 回答