1

如果我运行以下代码,我发现我可以从队列中获取消息,但callback无法触发

from kombu.mixins import ConsumerMixin
from kombu import Exchange, Queue

task_exchange = Exchange('nginx', type='direct')
task_queues = [Queue(exchange=task_exchange, routing_key='nginx')]

class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection


    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=task_queues,
                         callbacks=[self.task] 
                        )]  

    def task(self, body, message):
        print body
        message.ack()
if __name__ == '__main__':
    from kombu import Connection
    with Connection('amqp://test:test@localhost:5672/test') as conn:
         worker = Worker(conn)
         worker.run()

我试着跑python -m pdb test.py

170  ->     def run(self, _tokens=1):
171             restart_limit = self.restart_limit
172             errors = (self.connection.connection_errors +
173                       self.connection.channel_errors)
174             while not self.should_stop:
175                 try:
(Pdb) l
176                     if restart_limit.can_consume(_tokens):
177                         for _ in self.consume(limit=None):  # pragma: no    cover
178                             pass
179                     else:
180                         sleep(restart_limit.expected_time(_tokens))
181                 except errors:
182                     warn(W_CONN_LOST, exc_info=1)

它循环在

for _ in self.consume(limit=None):  # pragma: no    cover
           pass
4

1 回答 1

1

运行python -m pdb test.py,在connection.drain_events()中获取,发现content.body编码为二进制,

                if (content and
309                     channel.auto_decode and
310                     hasattr(content, 'content_encoding')):
311  ->             try:
312                     content.body = content.body.decode(content.content_encoding)   #here get a error
313                 except Exception:
314                     pass

修理它

def get_consumers(self, Consumer, channel):
    return [Consumer(queues=task_queues,
                     accept=['json', 'pickle'],
                     callbacks=[self.task] 
                    )] 
于 2016-05-20T07:50:24.520 回答