我正在使用 python stomp 库编写一个 python 脚本来连接和订阅 ActiveMQ 消息队列。
我的代码与文档“处理断开连接”中的示例非常相似,其中添加了将计时器放置在循环中以供长时间运行的侦听器使用。
侦听器类正在接收和处理消息。但是几分钟后,连接断开,然后侦听器停止接收消息。
问题:
正在调用 on_disconnected 方法,该方法运行 connect_and_subscribe() 方法,但是在发生这种情况后,侦听器似乎停止工作。也许监听器需要重新初始化?脚本再次运行后,监听器被重新创建,它再次开始接收消息,但是定期再次运行脚本是不切实际的。
问题 1:如何设置它以自动重新连接并重新创建侦听器?
问题 2:有没有更好的方法来初始化长时间运行的侦听器而不是超时循环?
import os, time, datetime, stomp
_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
sub_id = 1
def connect_and_subscribe(conn):
conn.connect(_user, _password, wait=True)
conn.subscribe(destination=_queue, id=sub_id, ack='client-individual')
print('connect_and_subscribe connecting {} to with connection id {}'.format(_queue, sub_id), flush=True)
class MqListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
self.sub_id = sub_id
print('MqListener init')
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, headers, body):
print('received a message headers "%s"' % headers)
print('message body "%s"' % body)
time.sleep(1)
print('processed message')
print('Acknowledging')
self.conn.ack(headers['message-id'], self.sub_id)
def on_disconnected(self):
print('disconnected! reconnecting...')
connect_and_subscribe(self.conn)
def initialize_mqlistener():
conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
conn.set_listener('', MqListener(conn))
connect_and_subscribe(conn)
# https://github.com/jasonrbriggs/stomp.py/issues/206
while conn.is_connected():
time.sleep(2)
conn.disconnect()
if __name__ == '__main__':
initialize_mqlistener()