0

我正在测试 IBM MQ 的性能(在本地 docker 容器中运行最新版本)我使用持久队列。

在生产者方面,我可以通过并行运行多个生产应用程序来获得更高的吞吐量。

但是,在消费者方面,我无法通过并行化消费者进程来增加吞吐量。相反,多个消费者的吞吐量甚至比单个消费者的吞吐量还要差。

消费表现不佳的原因可能是什么?

这不应该是由于硬件限制,因为我将消耗与生产进行比较,并且我只做了消息消耗而没有任何其他处理。

GET 是否为每条消息执行提交?不过,我在 PyMQI 中没有找到任何明确的提交方法。

put_demo.py

#!/usr/bin/env python3

import pymqi
import time

queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
message = b'Hello from Python!'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000

t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)

for i in range(nb_messages):
    try:
        queue.put(message)
    except pymqi.MQMIError as e:
        print(f"Fatal error: {str(e)}")

queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages/(t1-t0):.0f} nb_message_produced: {nb_messages}")

get_demo.py

#!/usr/bin/env python3

import pymqi
import time
import os

queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
nb_messages_consumed = 0

t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
gmo = pymqi.GMO(Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING)
gmo.WaitInterval = 1000

while nb_messages_consumed < nb_messages:
    try:
        msg = queue.get(None, None, gmo)
        nb_messages_consumed += 1
    except pymqi.MQMIError as e:
        if e.reason == 2033:
            # No messages, that's OK, we can ignore it.
            pass

queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages_consumed/(t1-t0):.0f} nb_messages_consumed: {nb_messages_consumed}")

run results

> for i in {1..10}; do ./put_demo.py & done
tps: 385 nb_message_produced: 1000
tps: 385 nb_message_produced: 1000
tps: 383 nb_message_produced: 1000
tps: 379 nb_message_produced: 1000
tps: 378 nb_message_produced: 1000
tps: 377 nb_message_produced: 1000
tps: 377 nb_message_produced: 1000
tps: 378 nb_message_produced: 1000
tps: 374 nb_message_produced: 1000
tps: 374 nb_message_produced: 1000

> for i in {1..10}; do ./get_demo.py & done
tps: 341 nb_messages_consumed: 1000
tps: 339 nb_messages_consumed: 1000
tps: 95 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000

get_demo.py updated version using syncpoint and batch commit

#!/usr/bin/env python3

import pymqi
import time
import os

queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
commit_batch = 10
nb_messages_consumed = 0

t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
gmo = pymqi.GMO(Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING | pymqi.CMQC.MQGMO_SYNCPOINT)
gmo.WaitInterval = 1000

while nb_messages_consumed < nb_messages:
    try:
        msg = queue.get(None, None, gmo)
        nb_messages_consumed += 1
        if nb_messages_consumed % commit_batch == 0:
            qmgr.commit()
    except pymqi.MQMIError as e:
        if e.reason == 2033:
            # No messages, that's OK, we can ignore it.
            pass

queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages_consumed/(t1-t0):.0f} nb_messages_consumed: {nb_messages_consumed}")

谢谢。

4

0 回答 0