1

我在为新的 SO 文档项目记录 Kombu 时偶然发现了这个问题。

考虑以下消费者 Mixin的 Kombu 代码:

from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin
from kombu.exceptions import MessageStateError
import datetime

# Send a message to the 'test_queue' queue
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    with conn.SimpleQueue(name='test_queue') as queue:
        queue.put('String message sent to the queue')


# Callback functions
def print_upper(body, message):
    print body.upper()
    message.ack()    

def print_lower(body, message):
    print body.lower()
    message.ack()


# Attach the callback function to a queue consumer 
class Worker(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(queues=Queue('test_queue'), callbacks=[print_even_characters, print_odd_characters]),
        ]

# Start the worker
with Connection('amqp://guest:guest@localhost:5672//') as conn:
    worker = Worker(conn)
    worker.run()

代码失败:

kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

因为消息被 ACK 了两次, onprint_even_characters()print_odd_characters()

一个可行的简单解决方案是仅确认最后一个回调函数,但如果我想在其他队列或连接上使用相同的函数,它会破坏模块化。

如何确认发送到多个回调函数的排队 Kombu 消息?

4

1 回答 1

1

解决方案

1 - 检查message.acknowledged

message.acknowledged标志检查消息是否已经被确认:

def print_upper(body, message):
    print body.upper()
    if not message.acknowledged: 
        message.ack()


def print_lower(body, message):
    print body.lower()
    if not message.acknowledged: 
        message.ack()

优点:可读,简短。

缺点:打破Python EAFP 成语

2 - 捕捉异常

def print_upper(body, message):
    print body.upper()
    try:
        message.ack()
    except MessageStateError:
        pass


def print_lower(body, message):
    print body.lower()
    try:
        message.ack()
    except MessageStateError:
        pass

优点:可读,Pythonic。

缺点:有点长 - 每个回调有 4 行样板代码。

3 - 确认最后一个回调

文档保证按顺序调用回调。因此,我们可以简单地.ack()只使用最后一个回调:

def print_upper(body, message):
    print body.upper()


def print_lower(body, message):
    print body.lower()
    message.ack()

优点:简短、易读、没有样板代码。

缺点:不是模块化的:回调不能被另一个队列使用,除非最后一个回调总是最后一个。这种隐含的假设可能会破坏调用者代码。

这可以通过将回调函数移动到Worker类中来解决。我们放弃了一些模块化——这些函数不会从外部调用——但获得了安全性和可读性。

概括

1 和 2 之间的区别只是风格问题。

如果执行顺序很重要,以及消息是否在成功通过所有回调之前不应该被 ACK-ed,则应该选择解决方案 3。

如果应该始终确认消息,则应该选择 1 或 2,即使一个或多个回调失败。

请注意,还有其他可能的设计;这个答案是指驻留在工作人员之外的回调函数。

于 2016-08-22T05:55:05.570 回答