1

我有一堆 celery 任务,它们获取结果并将它们发布到 RabbitMQ 消息队列。发布的结果可能会变得非常大(最多几兆)。关于在 RabbitMQ 消息中放入大量数据是否是一个好主意的观点参差不齐,但我在其他情况下看到了这种方法,只要控制内存,它似乎就可以工作。

但是,对于我当前的一组任务,rabbit 似乎只是在丢弃似乎太大的消息。我已将其简化为一个相当简单的测试用例:

#!/usr/bin/env python
import string
import random
import pika
import os
qname='examplequeue'
connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='mq.example.com'))
channel = connection.channel()

channel.queue_declare(queue=qname,durable=True)

N=100000
body = ''.join(random.choice(string.ascii_uppercase) for x in range(N))

promise = channel.basic_publish(exchange='', routing_key=qname, body=body, mandatory=0, immediate=0, properties=pika.BasicProperties(content_type="text/plain",delivery_mode=2))

print " [x] Sent 'Hello World!'"
connection.close()

我有一个 3 节点的 RabbitMQ 集群,以及mq.example.com每个节点的循环。客户端在 Ubuntu 12.04 上使用 Pika 0.9.5,RabbitMQ 集群在 Erlang R14B04 上运行 RabbitMQ 2.8.7。

执行此脚本会打印 print 语句并退出,不会引发任何异常。该消息永远不会出现在 RabbitMQ 中。

更改N10000使其按预期工作。

为什么?

4

2 回答 2

2

我想你对 RabbitMq 中的 tcp-backpressure mechanizm 有问题。您可以阅读有关http://www.rabbitmq.com/memory.html的信息。我看到了两种解决这个问题的方法:

  1. 添加 tcp-callback 并重新连接来自 rabbit 的每个 tcp-call
  2. 在发送给rabbit之前使用压缩消息,这将更容易推送给rabbit。
def compress(s):
     return binascii.hexlify(zlib.compress(s))

def decompress(s):
    return zlib.decompress(binascii.unhexlify(s))
于 2012-11-17T11:04:28.420 回答
0

这就是我发送和接收数据包的方式。它比 hexlify 更有效,因为 base64 可能使用一个字节,而 hexlify 需要两个字节来表示一个字符。

import zlib
import base64

def hexpress(send: str):
    print(f"send: {send}")
    bsend = send.encode()
    print(f"byte-encoded send: {bsend}")
    zbsend = zlib.compress(bsend)
    print(f"zipped-byte-encoded-send: {zbsend}")
    hzbsend = base64.b64encode(zbsend)
    print(f"hex-zip-byte-encoded-send: {hzbsend}")
    shzbsend = hzbsend.decode()
    print(f"string-hex-zip-byte-encoded-send: {shzbsend}")
    return shzbsend

def hextract(recv: str):
    print(f"string-hex-zip-byte-encoded-recv: {recv}")
    zbrecv = base64.b64decode(recv)
    print(f"zipped-byte-encoded-recv: {zbrecv}")
    brecv = zlib.decompress(zbrecv)
    print(f"byte-encoded-recv: {brecv}")
    recv = brecv.decode()
    print(f"recv: {recv}")
    return recv

print("sending ...\n")
send = "hello this is dog"
packet = hexpress(send)
print("\nover the wire -------->>>>>\n")
print("receiving...\n")
recv = hextract(packet)
于 2019-02-09T08:36:07.953 回答