9

我想向 RabbitMQ 服务器发送一条消息,然后等待回复消息(在“回复”队列上)。当然,我不想永远等待,以防处理这些消息的应用程序出现故障 - 需要超时。这听起来像是一项非常基本的任务,但我找不到这样做的方法。我现在在py-amqplibRabbitMQ .NET 客户端都遇到了这个问题。

到目前为止我得到的最好的解决方案是使用中间进行轮询basic_getsleep但这很丑陋:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

当然有更好的方法吗?

4

5 回答 5

9

这是我最终在 .NET 客户端中所做的事情:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}

不幸的是,我不能对 py-amqplib 做同样的事情,因为它的basic_consume方法不会调用回调,除非你调用channel.wait()并且channel.wait()不支持超时!这个愚蠢的限制(我一直遇到)意味着如果您再也没有收到过消息,您的线程将永远冻结。

于 2010-05-10T01:57:48.607 回答
8

amqplib我刚刚添加了对in 的超时支持carrot

这是一个子类amqplib.client0_8.Connection

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multichannel.wait能够在任意数量的频道上接收的版本。

我想这可能会在某个时候在上游合并。

于 2010-05-10T13:57:34.900 回答
2

这里有一个使用qpid的示例,msg = q.get(timeout=1)它应该可以满足您的需求。抱歉,我不知道还有哪些其他 AMQP 客户端库实现了超时(特别是我不知道你提到的两个具体的)。

于 2010-05-10T00:36:46.907 回答
1

这似乎打破了异步处理的整个想法,但如果你必须这样做,我认为正确的方法是使用RpcClient

于 2010-05-10T00:35:01.840 回答
1

Rabbit 现在允许您添加超时事件。只需将代码包装在 try catch 中,然后在 TimeOut 和 Disconnect 处理程序中抛出异常:

try{
    using (IModel channel = rabbitConnection.connection.CreateModel())
    {
        client = new SimpleRpcClient(channel, "", "", queue);
        client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity
        client.TimedOut += RpcTimedOutHandler;
        client.Disconnected += RpcDisconnectedHandler;
        byte[] replyMessageBytes = client.Call(message);
        return replyMessageBytes;
    }
}
catch (Exception){
    //Handle timeout and disconnect here
}
private void RpcDisconnectedHandler(object sender, EventArgs e)
{
     throw new Exception("RPC disconnect exception occured.");
}

private void RpcTimedOutHandler(object sender, EventArgs e)
{
     throw new Exception("RPC timeout exception occured.");
}
于 2016-04-22T08:26:25.053 回答