2

我正在为 rabbitmq 使用最新的 pika 库(0.9.9+)。我对 rabbitmq 和 pika 的用法如下:

  1. 作为工人,我有长时间运行的任务(大约 5 分钟)。这些任务从rabbitmq 获取它们的请求。请求很少出现,即请求之间有很长的空闲时间。
  2. 我之前面临的问题与空闲连接(由于空闲连接导致的连接关闭)有关。所以,我在 pika 中启用了心跳。
  3. 现在心跳的选择是个问题。Pika 似乎是一个单线程库,其中心跳接收和确认恰好在请求时间范围之间完成。
  4. 因此,如果设置的心跳间隔小于回调函数用于执行其长时间运行计算的时间,则服务器不会收到任何心跳确认并关闭连接。
  5. 所以,我假设最小心跳间隔应该是阻塞连接中回调函数的最大计算时间。

对于亚马逊 ec2 来说,什么是好的心跳值可以防止它关闭空闲连接?

此外,有些人建议使用 rabbitmq keepalive(或 libkeepalive)来维护 tcp 连接。我认为在 tcp 层管理心跳要好得多,因为应用程序不需要管理它们。这是真的吗?与 RMQ 心跳相比,keepalive 是一个好方法吗?

我看到有些人建议使用多个线程和队列来处理长时间运行的任务。但这是长时间运行任务的唯一选择吗?在这种情况下必须使用另一个队列是非常令人失望的。

先感谢您。我想我已经详细说明了这个问题。让我知道我是否可以提供更多详细信息。

4

1 回答 1

4

如果您不喜欢使用 pika,这个线程帮助我实现了您使用 kombu 尝试做的事情:

#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after

eventlet.monkey_patch()

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)


def long_running_function(body):
    time.sleep(300)

def job_worker(body, message):
    long_running_function(body)
    message.ack()

def monitor_heartbeats(connection, rate=2):
    """Function to send heartbeat checks to RabbitMQ. This keeps the
       connection alive over long-running processes."""
    if not connection.heartbeat:
        logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
        return
    interval = connection.heartbeat
    cref = weakref.ref(connection)
    logger.info("Starting heartbeat monitor.")

    def heartbeat_check():
        conn = cref()
        if conn is not None and conn.connected:
            conn.heartbeat_check(rate=rate)
            logger.info("Ran heartbeat check.")
            spawn_after(interval, heartbeat_check)
    return spawn_after(interval, heartbeat_check)

def main():
    setup_logging(loglevel='INFO')

    # process for heartbeat monitor
    p = None

    try:
        with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
            conn.ensure_connection()
            monitor_heartbeats(conn)
            queue = Queue('job_queue',
                          Exchange('job_queue', type='direct'),
                          routing_key='job_queue')
            logger.info("Starting worker.")
            with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
                consumer.qos(prefetch_count=1)
                for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
                    pass
    except KeyboardInterrupt:
        logger.info("Worker was shut down.")

if __name__ == "__main__":
    main()

我剥离了我的领域特定代码,但本质上这是我使用的框架。

于 2013-05-13T16:39:49.550 回答