1

我正在使用 Stomp.py 连接到标准 ACtiveMQ 服务器。我正在模拟接收器崩溃的情况,我希望能够重新启动它,并在导致它崩溃的消息之后继续从消息中运行。

我创建了两个示例脚本:

  • putMessagesToQueue.py - 这会将 56 条消息放入目的地
  • readMessagesFromQueue.py - 这将从目的地读取消息。如果它读取第 6 条消息,它将引发异常。每条消息需要 1 秒来处理

我运行测试的步骤:

  1. 我运行 putMessagesToQueue.py
  2. 我运行 readMessagesFromQueue.py - 它成功处理 5 条消息,并在消息 6 中引发异常
  3. 我终止 readMessagesFromQueue.py (ctrl-c)
  4. 我再次运行 readMessagesFromQueue.py

对于我在步骤 4 中想要的行为,我希望它从消息 7 开始处理。

但是我没有看到这一点。如果接收者使用 ack='auto' 订阅,那么在第 4 步中它不会处理任何消息 - 所有消息都从队列中消失,我丢失了 50 条消息!

如果我使用 ack='client' 或 ack='client-individual' 然后在第 4 步,它会从头开始,然后在消息 6 上再次崩溃。

这似乎表明接收者不是一次处理消息,而是一次接收每条消息并遍历每条消息。我不想要这种行为,因为我想扩大到运行 5 个接收器并且我希望分配负载。目前,我开始的第一个接收者接收所有消息并开始翻阅它们,而接收者 2-4 只是等待新消息。我希望接收者一次接收一条消息!

任何人都可以就我如何实现这个错误给出任何提示:

资源

putMessagesToQueue.py

import stomp

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

for x in range(0,5):
  conn.send(body="OK-BEFORE-CRASH", destination=destination)

conn.send(body="CRASH", destination=destination)

for x in range(0,50):
  conn.send(body="OK-AFTER-CRASH", destination=destination)

readMessagesFromQueue.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  def __init__(self, processMessage):
    self.processMessage = processMessage
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    self.processMessage(headers, message)

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

更新 001

通过将接收函数更改为使用 ack='client-individual' 并手动发送 ack 消息,我设法获得了上述所需的行为。(见下面的新版本)

但我仍然无法让接收者一次处理一条消息。这可以通过以下步骤来证明:

  1. 我运行 putMessagesToQueue.py
  2. 我运行 readMessagesFromQueue2.py - 它将开始处理
  3. 在新终端中运行 readMessagesFromQueue2.py

首先,第二个 readMessagesFromQueue2 什么都不做,直到第一个崩溃,然后它开始接收消息。我希望接收者的两个实例从一开始就阅读消息。

readMessagesFromQueue2.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  conn = None
  def __init__(self, processMessage, conn):
    self.processMessage = processMessage
    self.conn = conn
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    try:
      self.processMessage(headers, message)
    finally:
      self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")
4

1 回答 1

2

大量阅读不同的文档,我发现了问题。

ActiveMQ 有一个选项预取大小 - https://svn.apache.org/repos/infra/websites/production/activemq/content/5.7.0/what-is-the-prefetch-limit-for.html

如果您有很少的消息需要很长时间来处理,您可以将其设置为 1。这在其他情况下并不合适。

我可以使用以下行在 stopm.py 中执行此操作: conn.subscribe(destination=destination, id=1, ack='client-individual', headers={'activemq.prefetchSize': 1})

因此,使用手动或自动确认既不存在也不存在。关键是将预取限制为 1。

于 2020-05-05T11:10:35.367 回答