如果我运行以下代码,我发现我可以从队列中获取消息,但callback
无法触发
from kombu.mixins import ConsumerMixin
from kombu import Exchange, Queue
task_exchange = Exchange('nginx', type='direct')
task_queues = [Queue(exchange=task_exchange, routing_key='nginx')]
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
callbacks=[self.task]
)]
def task(self, body, message):
print body
message.ack()
if __name__ == '__main__':
from kombu import Connection
with Connection('amqp://test:test@localhost:5672/test') as conn:
worker = Worker(conn)
worker.run()
我试着跑python -m pdb test.py
170 -> def run(self, _tokens=1):
171 restart_limit = self.restart_limit
172 errors = (self.connection.connection_errors +
173 self.connection.channel_errors)
174 while not self.should_stop:
175 try:
(Pdb) l
176 if restart_limit.can_consume(_tokens):
177 for _ in self.consume(limit=None): # pragma: no cover
178 pass
179 else:
180 sleep(restart_limit.expected_time(_tokens))
181 except errors:
182 warn(W_CONN_LOST, exc_info=1)
它循环在
for _ in self.consume(limit=None): # pragma: no cover
pass