我正在尝试处理生产者端的流量控制情况。我在 qpid-broker 上有一个队列,其中设置了最大队列大小。在队列上还设置了 flow_stop_count 和 flow_resume_count。
现在生产者继续不断地产生消息,直到达到这个 flow_stop_count。违反此计数时,将引发由异常侦听器处理的异常。现在某个时候,队列中的消费者将赶上并达到 flow_resume_count。问题是制作人是如何知道这个事件的。
这是生产者的示例代码
connection connection = connectionFactory.createConnection();
connection.setExceptionListenr(new MyExceptionListerner());
connection.start();
Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup("Test");
MessageProducer producer = session.createProducer(queue);
while(notStopped){
while(suspend){//---------------------------how to resume this flag???
Thread.sleep(1000);
}
TextMessage message = session.createTextMessage();
message.setText("TestMessage");
producer.send(message);
}
session.close();
connection.close();
对于异常监听器
private class MyExceptionListener implements ExceptionListener {
public void onException(JMSException e) {
System.out.println("got exception:" + e.getMessage());
suspend=true;
}
}
现在 exceptionlistener 是一个通用的异常监听器,所以暂停生产者流通过它应该不是一个好主意。
我需要的可能是生产者级别的一些方法,比如produer.isFlowStopped(),我可以在发送消息之前使用它来检查。qpid api 中是否存在这样的功能。
qpid 网站上有一些文档表明可以这样做。但我在任何地方都找不到任何这样做的例子。
是否有一些标准的方法来处理这种情况。