编辑:
主要问题是第 3 方 rabbitmq 机器似乎时不时地杀死空闲连接。那是我开始收到“Broken Pipe”异常的时候。获得通讯的唯一途径。恢复正常是让我终止进程并重新启动它们。我想有更好的方法吗?
--
我在这里有点迷路了。我正在连接到第 3 方 RabbitMQ 服务器以将消息推送到。时不时地,他们机器上的所有套接字都被丢弃,我最终得到一个“断管”异常。
我被告知要在我的代码中实现心跳检查,但我不确定具体如何。我在这里找到了一些信息:http: //kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0但没有真正的示例代码。
我只需要在连接字符串中添加“?heartbeat = x”吗?Kombu 会做剩下的事情吗?我看到我需要在“x/2”处调用“Connection.heartbeat_check()”。我应该创建一个定期任务来调用它吗?如何重新建立连接?
我正在使用:
- 芹菜==3.0.12
- 海带==2.5.4
我的代码现在看起来像这样。调用一个简单的 Celery 任务将消息发送到第 3 方 RabbitMQ 服务器(删除了日志和注释以保持简短,足够基本):
class SendMessageTask(Task):
name = "campaign.backends.send"
routing_key = "campaign.backends.send"
ignore_result = True
default_retry_delay = 60 # 1 minute.
max_retries = 5
def run(self, send_to, message, **kwargs):
payload = "Testing message"
try:
conn = BrokerConnection(
hostname=HOSTNAME,
port=PORT,
userid=USER_ID,
password=PASSWORD,
virtual_host=VHOST
)
with producers[conn].acquire(block=True) as producer:
publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
publish(
body=payload,
routing_key=OUT_ROUTING_KEY,
delivery_mode=2,
exchange=EXCHANGE,
serializer=None,
content_type='text/xml',
content_encoding = 'utf-8'
)
except Exception, ex:
print ex
感谢您的任何帮助。