如果您不喜欢使用 pika,这个线程帮助我实现了您使用 kombu 尝试做的事情:
#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after
eventlet.monkey_patch()
log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)
def long_running_function(body):
time.sleep(300)
def job_worker(body, message):
long_running_function(body)
message.ack()
def monitor_heartbeats(connection, rate=2):
"""Function to send heartbeat checks to RabbitMQ. This keeps the
connection alive over long-running processes."""
if not connection.heartbeat:
logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
return
interval = connection.heartbeat
cref = weakref.ref(connection)
logger.info("Starting heartbeat monitor.")
def heartbeat_check():
conn = cref()
if conn is not None and conn.connected:
conn.heartbeat_check(rate=rate)
logger.info("Ran heartbeat check.")
spawn_after(interval, heartbeat_check)
return spawn_after(interval, heartbeat_check)
def main():
setup_logging(loglevel='INFO')
# process for heartbeat monitor
p = None
try:
with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
conn.ensure_connection()
monitor_heartbeats(conn)
queue = Queue('job_queue',
Exchange('job_queue', type='direct'),
routing_key='job_queue')
logger.info("Starting worker.")
with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
consumer.qos(prefetch_count=1)
for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
pass
except KeyboardInterrupt:
logger.info("Worker was shut down.")
if __name__ == "__main__":
main()
我剥离了我的领域特定代码,但本质上这是我使用的框架。