我正在使用 Stomp.py 连接到标准 ACtiveMQ 服务器。我正在模拟接收器崩溃的情况,我希望能够重新启动它,并在导致它崩溃的消息之后继续从消息中运行。
我创建了两个示例脚本:
- putMessagesToQueue.py - 这会将 56 条消息放入目的地
- readMessagesFromQueue.py - 这将从目的地读取消息。如果它读取第 6 条消息,它将引发异常。每条消息需要 1 秒来处理
我运行测试的步骤:
- 我运行 putMessagesToQueue.py
- 我运行 readMessagesFromQueue.py - 它成功处理 5 条消息,并在消息 6 中引发异常
- 我终止 readMessagesFromQueue.py (ctrl-c)
- 我再次运行 readMessagesFromQueue.py
对于我在步骤 4 中想要的行为,我希望它从消息 7 开始处理。
但是我没有看到这一点。如果接收者使用 ack='auto' 订阅,那么在第 4 步中它不会处理任何消息 - 所有消息都从队列中消失,我丢失了 50 条消息!
如果我使用 ack='client' 或 ack='client-individual' 然后在第 4 步,它会从头开始,然后在消息 6 上再次崩溃。
这似乎表明接收者不是一次处理消息,而是一次接收每条消息并遍历每条消息。我不想要这种行为,因为我想扩大到运行 5 个接收器并且我希望分配负载。目前,我开始的第一个接收者接收所有消息并开始翻阅它们,而接收者 2-4 只是等待新消息。我希望接收者一次接收一条消息!
任何人都可以就我如何实现这个错误给出任何提示:
资源
putMessagesToQueue.py
import stomp
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
for x in range(0,5):
conn.send(body="OK-BEFORE-CRASH", destination=destination)
conn.send(body="CRASH", destination=destination)
for x in range(0,50):
conn.send(body="OK-AFTER-CRASH", destination=destination)
readMessagesFromQueue.py
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
def __init__(self, processMessage):
self.processMessage = processMessage
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
self.processMessage(headers, message)
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")
更新 001
通过将接收函数更改为使用 ack='client-individual' 并手动发送 ack 消息,我设法获得了上述所需的行为。(见下面的新版本)
但我仍然无法让接收者一次处理一条消息。这可以通过以下步骤来证明:
- 我运行 putMessagesToQueue.py
- 我运行 readMessagesFromQueue2.py - 它将开始处理
- 在新终端中运行 readMessagesFromQueue2.py
首先,第二个 readMessagesFromQueue2 什么都不做,直到第一个崩溃,然后它开始接收消息。我希望接收者的两个实例从一开始就阅读消息。
readMessagesFromQueue2.py
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
conn = None
def __init__(self, processMessage, conn):
self.processMessage = processMessage
self.conn = conn
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
try:
self.processMessage(headers, message)
finally:
self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")