8

我正在尝试使用pythoninzeroMQ模式,每隔几秒PUSH / PULL发送一次大小为4[MB]的消息。

出于某种原因,虽然看起来所有消息都已发送,但似乎只有其中一些消息已被服务器接收。我在这里想念什么?

这是客户端的代码-client.py

import zmq
import struct

# define a string of size 4[MB] 
msgToSend = struct.pack('i', 45) * 1000 * 1000 

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:5000")

# print the message size in bytes
print len(msgToSend)

socket.send(msgToSend)

print "Sent message"

这是服务器的代码——server.py

import zmq
import struct

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://127.0.0.1:5000")

while True:
    # receive the message
    msg = socket.recv()

    print "Message Size is: {0} [MB]".format( len(msg) / (1000 * 1000) )

我错过了什么?如何保证消息始终发送且不丢失?

以防万一,我使用的是Ubuntu 10.0432 位 Core Duo 机器和 2[GB] RAM。

注意:我尝试使用相同的示例RabbitMQ,一切正常 - 没有消息丢失。我很困惑,因为我经常听到赞美zeroMQ。为什么在RabbitMQ成功的地方失败了?

4

2 回答 2

14

问题在于,当程序退出时,套接字会立即关闭,并以有效的 LINGER 为 0 进行垃圾收集(即,它会丢弃任何未发送的消息)。这对于较大的消息来说是一个问题,因为它们发送所需的时间比套接字被垃圾收集所需的时间要长。

sleep(0.1)您可以通过在程序退出之前放置 a 来避免这种情况(以延迟套接字和上下文被垃圾收集)。

socket.setsockopt(zmq.LINGER, -1)(这是默认设置)应该避免这个问题,但由于某种原因我没有时间调查。

于 2011-07-15T07:59:51.510 回答
1

可以想象您的内存不足(取决于您发送消息的方式,是否足够快地消耗它们等)。您可以使用socket.setsockopt(zmq.HWM)设置HWM为合理的值并防止 zeromq 在传出缓冲区中存储太多消息。考虑到这一点,考虑稍微修改的示例:

# server
...
counter = 0
while True:
    ...receive the message
    counter += 1
    print "Total messages recieved: {0}".format(counter)

# client
socket.setsockopt(zmq.HWM, 8)
for i in range(1000):
    socket.send(msgToSend)

然后运行 ​​10 个测试客户端:

for i in {1..10}; do
    python client.py &
done

从服务器您可以看到所有消息都已收到:

Total messages recieved: 9998
Total messages recieved: 9999
Total messages recieved: 10000
于 2011-07-15T04:42:53.163 回答