2

我正在尝试处理生产者端的流量控制情况。我在 qpid-broker 上有一个队列,其中设置了最大队列大小。在队列上还设置了 flow_stop_count 和 flow_resume_count。

现在生产者继续不断地产生消息,直到达到这个 flow_stop_count。违反此计数时,将引发由异常侦听器处理的异常。现在某个时候,队列中的消费者将赶上并达到 flow_resume_count。问题是制作人是如何知道这个事件的。

这是生产者的示例代码

    connection connection = connectionFactory.createConnection();
    connection.setExceptionListenr(new MyExceptionListerner());
    connection.start();
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    Queue queue = (Queue)context.lookup("Test");
    MessageProducer producer = session.createProducer(queue);
    while(notStopped){
        while(suspend){//---------------------------how to resume this flag???
            Thread.sleep(1000);
        }
        TextMessage message = session.createTextMessage();
        message.setText("TestMessage");
        producer.send(message);
    }
    session.close();
    connection.close();

对于异常监听器

    private class MyExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println("got exception:" + e.getMessage());
        suspend=true;
    }
}

现在 exceptionlistener 是一个通用的异常监听器,所以暂停生产者流通过它应该不是一个好主意。

我需要的可能是生产者级别的一些方法,比如produer.isFlowStopped(),我可以在发送消息之前使用它来检查。qpid api 中是否存在这样的功能。

qpid 网站上有一些文档表明可以这样做。但我在任何地方都找不到任何这样做的例子。

是否有一些标准的方法来处理这种情况。

4

3 回答 3

2

从我从 Apache QPid 文档中读到的内容来看,flow_resume_count 和 flow_stop_count 似乎会导致生产者开始被阻止。

因此,唯一的选择是软件明智地定期轮询,直到消息再次开始流动。

从这里提取。

如果生产者发送到一个已满的队列,代理将通过指示客户端不再发送任何消息来做出响应。这样做的影响是,任何未来的发送尝试都将被阻塞,直到代理撤销流控制命令。

在阻塞时,客户端将定期记录它被阻塞等待流量控制的事实。

WARN AMQSession - Broker 强制流控制已被强制执行到调用代码。

ERROR AMQSession - 由于超时等待代理强制流控制,消息发送失败。

从该文档中可以看出,管理生产者的软件将不得不自我管理。因此,基本上,当您收到队列已满的异常时,您将需要退后并很可能轮询并重新尝试发送您的消息。

于 2012-01-21T21:11:08.787 回答
1

您可以尝试设置队列的容量(队列被认为已满的字节大小)和 flowResumeCapacity(生产者未流动的队列大小)属性。

如果大小超过容量值,则 send() 将被阻止。

您可以查看repo 中的这个测试用例文件来了解一下。

于 2012-01-21T20:55:03.527 回答
1

JMS 客户端上尚未实现生产者流控制。请参阅https://issues.apache.org/jira/browse/QPID-3388

于 2012-06-20T18:43:24.277 回答