23

我想在几个线程中处理消息,但在执行此代码期间出现错误:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

错误说明:

回溯(最近一次通话最后):
  文件“C:\work\projects\mq\start.py”,第 43 行,在
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 318 行,在 basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  文件“C:\work\projects\mq\libs\pika\channel.py”,第 469 行,在 basic_get
    no_ack=no_ack))
  文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 244 行,在 send_method 中
    self.connection.process_data_events()
  文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 94 行,在 process_data_events
    self._handle_read()
  _handle_read 中的文件“C:\work\projects\mq\libs\pika\adapters\base_connection.py”,第 162 行
    self._on_data_available(数据)
  _on_data_available 中的文件“C:\work\projects\mq\libs\pika\connection.py”,第 589 行
    框架)#参数
  文件“C:\work\projects\mq\libs\pika\callback.py”,第 124 行,正在进行中
    回调(*参数,**关键字)
  _on_remote_close 中的文件“C:\work\projects\mq\libs\pika\adapters\blocking_connection.py”,第 269 行
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - 未知的交付标签 204')

版本:鼠兔 0.9.5、rabbitMQ 2.6.1

4

7 回答 7

46

问题可能是你的设置no_ack=True是这样的:

consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)

然后确认消息:

channel.basic_ack(delivery_tag=args.delivery_tag)

您必须选择是否要确认并设置正确的消耗参数。

于 2014-04-07T10:09:09.580 回答
13

对我来说,只是我告诉队列我不会确认,然后我确认了。

例如错误

channel.basic_consume(callback, queue=queue_name, no_ack=True)

然后在我的回调中:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue=queue_name, no_ack=False)

底线:如果要手动确认,请设置 no_ack=False。

从文档:

no_ack:(bool)如果设置为 True,将使用自动确认模式(参见http://www.rabbitmq.com/confirms.html

于 2015-09-17T09:02:17.520 回答
4

您的代码存在错误。您跨线程共享通道。pika 不支持此功能(请参阅常见问题解答)。您有 2 个选项:

  1. 在线程函数中定义no_ack=True标志basic_get(...)并且不使用通道对象doWork(...)
  2. 如果您仅在完成工作后才需要确认消息,则让主线程(while True:循环)处理消息确认(而不是工作线程)。下面是执行此操作的代码的修改版本。

    from __future__ import with_statement
    import pika
    import sys
    from pika.adapters.blocking_connection import BlockingConnection
    from pika import connection, credentials
    import time
    import threading
    import random
    from pika.adapters.select_connection import SelectConnection
    from pika.connection import Connection
    import traceback
    from Queue import Queue, Empty
    
    def doWork(body, args, channel, ack_queue):
        time.sleep(random.random())
        ack_queue.put(args.delivery_tag)
    
    def doAck(channel):
        while True:
            try:
                r = ack_queue.get_nowait()
            except Empty:
                r = None
            if r is None:
                break
            try:
                channel.basic_ack(delivery_tag=r)
            except:
                traceback.print_exc()
    
    auth = credentials.PlainCredentials(username="guest", password="guest")
    params = connection.ConnectionParameters(host="localhost", credentials=auth)
    conn = BlockingConnection(params)
    channel = conn.channel()
    # Create a queue for the messages that should be ACKed by main thread
    ack_queue = Queue()
    
    while True:
        time.sleep(0.03)    
        try:
            doAck(channel)
            method_frame, header_frame, body = channel.basic_get(queue="test_queue")
            if method_frame.NAME == 'Basic.GetEmpty':
                continue        
            t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
            t.setDaemon(True)
            t.start()
        except Exception, e:
            traceback.print_exc()
            continue
    
于 2015-05-05T10:18:46.417 回答
2

我没有修复程序,但我可以使用 BlockingConnection 适配器验证它是否发生。

它始终在确认或拒绝响应 channel.basic_recover() 重新传递的消息时发生

pika 0.9.5、rabbitMQ 2.2.0、python 2.7 和 Erlang R14B01

我的解决方法是始终指定deliver_tag=0

我怀疑这仅在您正在确​​认/确认的消息是您阅读的最后一条消息(在流中)时才有效。我正在编写的库以一种可以独立确认每个消息的方式抽象消息,这与此解决方案不同。

任何人都可以确认这是否已被 pika 团队中的任何人修复或确认?或者,这可能是 RabbitMQ 的问题吗?

于 2012-02-23T19:22:01.957 回答
0

看到RabbitMQ - 升级到新版本后得到很多“PRECONDITION_FAILED unknown delivery tag 1”

我将基本消费更改为如下所示:

    consumer_tag = channel.basic_consume(
        message_delivery_event,
        no_ack=True,
        queue=queue,
    )

当指定消息的传递标签时,这会在初始(未重新传递)确认中导致所描述的错误。传递是从消息传递的方法结构中提取的。

使用

    channel.basic_ack(delivery_tag=0)

在这种情况下也抑制错误

查看http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html看起来好像它可能是 RabbitMQ 中的一个问题。

于 2012-02-23T19:44:40.290 回答
0

如果您尝试在创建消息的不同通道上确认消息,您也可能会遇到此错误。如果您正在关闭或重新创建频道,则可能会发生这种情况。

来自文档:https ://www.rabbitmq.com/confirms.html

代理会抱怨“未知传递标签”的另一种情况是,在与接收传递不同的通道上尝试确认,无论是肯定的还是否定的。必须在同一渠道上确认交付。

于 2019-07-01T23:30:11.500 回答
0

产生此问题是因为您已设置 { noack: true },但仍在尝试发送确认。

于 2019-06-17T12:34:47.210 回答