2

我实际上正在寻找来自 ActiveMQ 的咨询或任何其他替代支持,以便在与 Consumer 关联的 MessageListener 完成处理消息时通知我。

MessageDelivered 咨询似乎会在代理收到消息后立即通知。此外,MessageConsumed 咨询声称在消费者收到消息时进行通知。

- - - - - - - - - - - - 更新 - - - - - - - - - - - - - -----

请在下面找到代码片段:

public class SampleListener implements MessageListener {

    private Session session;

    public SampleListener(Session session) {
        this.session = session; 
    }

    public void onMessage(Message message) {
        try {
             // do something
             session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class SampleConsumer {

    private boolean stopConnection = false;

    public static void main(String[] args) {
        new SampleConsumer().start();
    }

    public void start() {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                Destination destination = session.createTopic("test");
                MessageConsumer messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new SampleListener(session));

                try {
                    synchronized (this) {
                        while (!stopConnection) {
                            wait();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    session.close();
                    connection.close();
                }

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop() {
        synchronized (this) {
            stopConnection = true;
            notify();
        }
    }
}


public class SampleProducer implements MessageListener {

    private boolean messageDelivered;

    @Test
    public void shouldTestSomething() throws JMSException, InterruptedException {
        producerConnection = new ActiveMQConnectionFactory("tcp://localhost:61616").createConnection();
        producerConnection.start();
        Session session = producerConnection.createSession(true, SESSION_TRANSACTED);

        Destination destination = session.createTopic("test");
        MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(destination));
        advisoryConsumer.setMessageListener(this);

        Message message = session.createTextMessage("Hi");
        Destination destination = session.createTopic("test");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(message);
        session.commit();

        synchronized (this) {
            while (!messageDelivered) {
                wait();
            }
        }

        session.close();

        // some assertions
    }

    public void onMessage(Message message) {
        // do something

        synchronized (this) {
            messageDelivered = true;
            notify();
        }
    }
}
4

1 回答 1

1

默认情况下不启用某些建议。链接:

http://activemq.apache.org/advisory-message.html

可以通过将 policyEntry 添加到 activemq.xml http://activemq.apache.org/xml-configuration.html来启用禁用的建议

将以下内容添加到 activemq.xml:

     <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" advisoryForConsumed="true" />
            <policyEntry  queue=">" advisoryForConsumed="true" />
            ..            
          </policyEntries>
        </policyMap>
    </destinationPolicy>

在消费者中启用咨询并调用 session.commit() 后,将传递咨询。

如果您使用的是嵌入式代理,您只需将 activemq.xml 放在类路径上并使用以下命令启动代理:

BrokerService broker = BrokerFactory.createBroker("xbean:activemq.xml",true);

(我没有找到任何方法在不使用 activemq.xml 的情况下启用禁用的建议)。

于 2012-11-05T11:52:17.410 回答