我正在测试 IBM MQ 的性能(在本地 docker 容器中运行最新版本)我使用持久队列。
在生产者方面,我可以通过并行运行多个生产应用程序来获得更高的吞吐量。
但是,在消费者方面,我无法通过并行化消费者进程来增加吞吐量。相反,多个消费者的吞吐量甚至比单个消费者的吞吐量还要差。
消费表现不佳的原因可能是什么?
这不应该是由于硬件限制,因为我将消耗与生产进行比较,并且我只做了消息消耗而没有任何其他处理。
GET 是否为每条消息执行提交?不过,我在 PyMQI 中没有找到任何明确的提交方法。
put_demo.py
#!/usr/bin/env python3
import pymqi
import time
queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
message = b'Hello from Python!'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
for i in range(nb_messages):
try:
queue.put(message)
except pymqi.MQMIError as e:
print(f"Fatal error: {str(e)}")
queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages/(t1-t0):.0f} nb_message_produced: {nb_messages}")
get_demo.py
#!/usr/bin/env python3
import pymqi
import time
import os
queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
nb_messages_consumed = 0
t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
gmo = pymqi.GMO(Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING)
gmo.WaitInterval = 1000
while nb_messages_consumed < nb_messages:
try:
msg = queue.get(None, None, gmo)
nb_messages_consumed += 1
except pymqi.MQMIError as e:
if e.reason == 2033:
# No messages, that's OK, we can ignore it.
pass
queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages_consumed/(t1-t0):.0f} nb_messages_consumed: {nb_messages_consumed}")
run results
> for i in {1..10}; do ./put_demo.py & done
tps: 385 nb_message_produced: 1000
tps: 385 nb_message_produced: 1000
tps: 383 nb_message_produced: 1000
tps: 379 nb_message_produced: 1000
tps: 378 nb_message_produced: 1000
tps: 377 nb_message_produced: 1000
tps: 377 nb_message_produced: 1000
tps: 378 nb_message_produced: 1000
tps: 374 nb_message_produced: 1000
tps: 374 nb_message_produced: 1000
> for i in {1..10}; do ./get_demo.py & done
tps: 341 nb_messages_consumed: 1000
tps: 339 nb_messages_consumed: 1000
tps: 95 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
get_demo.py updated version using syncpoint and batch commit
#!/usr/bin/env python3
import pymqi
import time
import os
queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
commit_batch = 10
nb_messages_consumed = 0
t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
gmo = pymqi.GMO(Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING | pymqi.CMQC.MQGMO_SYNCPOINT)
gmo.WaitInterval = 1000
while nb_messages_consumed < nb_messages:
try:
msg = queue.get(None, None, gmo)
nb_messages_consumed += 1
if nb_messages_consumed % commit_batch == 0:
qmgr.commit()
except pymqi.MQMIError as e:
if e.reason == 2033:
# No messages, that's OK, we can ignore it.
pass
queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages_consumed/(t1-t0):.0f} nb_messages_consumed: {nb_messages_consumed}")
谢谢。