-1

我想将消息从 AMQ 拉到 python。我想使用 python 进行批处理(例如,如果队列中有 1000 条消息,我需要每 100 条消息出列并处理它们,然后获取接下来的 100 条消息并处理......直到所有消息都出列。)

这是我的 batchListener 的 python 代码:

class BatchEventListner(stomp.ConnectionListener):
     def on_message(self, headers, message):
         print('received a message "%s"' % message)


batchLsnr = BatchEventListner()
self.conn = stomp.Connection(host_and_ports=hosts)
self.conn.set_listener('', batchLsnr)
self.batchLsnr = batchLsnr
self.conn.start()
self.conn.connect('username', 'password', wait=True)
self.conn.subscribe(destination='/queue/' + self.queue_name, id=1, ack='auto')

我写了一个模拟器来将消息推送到 AMQ。一旦我将 1000 条消息推送到 ActiveMQ。当消费者启动时,python 代码将开始从 ActiveMQ 中提取数据,但 python 代码一次提取 100 多条消息。(仅对 100 条消息进行处理,但有超过 100 条消息正在出队)。即,对于最后一批(100 条消息),我们在 ActiveMQ 中看不到任何消息,但消息正在 python 进程中获取。

1. stomp 在从 ActiveMQ 出队时是否持有任何消息? 2. 批量处理时 stomp 是否保存任何数据?

4

1 回答 1

0

您可能会看到prefetch的结果。尝试将框架中的activemq.prefetchSize标题设置为.SUBSCRIBE1

另外,尝试将确认模式设置为clientclient-individual。使用auto基本上会触发代理尽可能快地向客户端发送消息。

请记住,预取消息是一种性能优化,因此降低它可能会导致性能下降。当然,性能必须与其他功能因素进行权衡。我建议您进行测试和调整,直到您满足所有目标或在它们之间找到可接受的折衷方案。

于 2021-06-07T13:39:35.393 回答