我正在使用 Pika 处理来自 RabbitMQ 的数据。当我似乎遇到不同类型的问题时,我决定编写一个小型测试应用程序来看看我如何处理断开连接。
我编写了这个测试应用程序,它执行以下操作:
- 连接Broker,重试直到成功
- 连接后创建队列。
- 使用此队列并将结果放入 python Queue.Queue(0)
- 从 Queue.Queue(0) 中获取项目并将其生成回代理队列。
我注意到有两个问题:
- 当我从一个连接到另一台主机(在 vm 内)上的 rabbitmq 的主机上运行我的脚本时,这个脚本会随机退出而不会产生错误。
- 当我在安装了 RabbitMQ 的同一主机上运行我的脚本时,它运行良好并继续运行。
这可能是因为网络问题,数据包丢失,尽管我发现连接不是很健壮。
当脚本在 RabbitMQ 服务器上本地运行并且我杀死 RabbitMQ 时脚本退出并出现错误:“ERROR pika SelectConnection: Socket Error on 3: 104”
所以看起来我无法让重新连接策略正常工作。有人可以看看代码,看看我做错了什么吗?
谢谢,
周杰伦
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err