5

我想限制我的 ZeroMQ 消息队列在 Python 应用程序中消耗的内存量。我知道设置高水位线会限制发送方排队的数量,但是有没有办法控制接收方排队的数量?Python ZeroMQ 绑定似乎将其设置为无限制。

我的测试场景:我有两个用于测试的 python 终端。一种是接收器:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")

另一个是发件人:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
...  print num
...  socket.send(str(num))
...  num = num + 1
... 

socket.recv()在接收端运行了几次以确保队列正常工作,但除此之外,让两个终端就坐在那里。发送循环似乎永远不会阻塞,并且接收提示似乎有越来越多的内存占用。

4

3 回答 3

3

与 ZeroMQ 的文档相矛盾的是,高水位线需要在PUSH边和PULL边上设置。一旦我改变了PULL,它工作得更好。新PULL代码是:

Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04) 
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")
于 2012-02-22T16:45:31.330 回答
1

通过zmq.SNDBUFzmq.RCVBUF选项,您可以设置缓冲区大小的限制


此外,我zmq.CONFLATE在接收方使用选项将 ZeroMQ 队列大小限制为一个:

这是 ZMQ 的示例PUSH/PULL

发件人 ( zmq.PUSH):

def create_pub_socket(ip, port):
    try:
        context = zmq.Context()
        socket = context.socket(zmq.PUSH)
        socket.setsockopt(zmq.SNDHWM, 1)
        zmq_address = "tcp://{}:{}".format(ip, port)
        socket.connect(zmq_address)
        return socket

    except zmq.ZMQError as exp:
        print(exp)
        return False

sock = create_push_socket('127.0.0.1', 5558)
if sock:
    sock.send_json({'a': 1})

吸气剂 ( zmq.PULL):

def listen(self):
    sock = None
    try:
        context = zmq.Context()
        sock = context.socket(zmq.PULL)
        sock.setsockopt(zmq.RCVHWM, 1)
        sock.setsockopt(zmq.CONFLATE, 1)  # last msg only.
        sock.bind("tcp://*:5558")

    except zmq.ZMQError:
        logger.captureException()

    configs = None
    while configs is None:
        if sock:
            configs = sock.recv_json()
            time.sleep(1e-1)
        else:
            time.sleep(5)
            listen()  # Recursive.
listen()
于 2018-09-16T11:33:26.670 回答
0

实际上,文档是这样说的:

“当 ZMQ_PUSH 套接字由于已达到所有下游节点的高水位标记而进入异常状态时,或者如果根本没有下游节点,则套接字上的任何 zmq_send(3) 操作都应阻塞,直到异常状态结束或至少有一个下游节点可用于发送;消息不会被丢弃。”

http://api.zeromq.org/2-1:zmq-socket

这完全表明您可以(并且应该)为下游节点(又名拉)设置高水位线,并且可能暗示将其设置在推送端将没有效果(尽管我怀疑这不是真的,因为仍然存在下游节点可用但消息进来的速度比发送速度快的情况。)

于 2015-03-25T17:19:02.543 回答