2

在 activemq 中,我正在做一些可能是错误的事情。我正在创建一个消费者并在使用消息时设置一个 messageListener (consumer.setMessageListener(MessageListener))。这会导致 activeMQ 死锁。

这是我正在做的代码示例:

class DriverClass extends MessageListener 
{
    Destination destination = session.createTopic("Some.Topic");
    MessageListener msgSub = new MyClass(destination);
    Session session = passedInSession;


    MessageConsumer consumer;
    try
    {        
        consumer = session.createConsumer(destination);

        msgSub.setConsumer( consumer );

        if(msgSub.getMessageListener() != null)
        {
            consumer.setMessageListener( msgSub );
        }
    }
    catch (Exception ex)
    {
        LOGGER.error( "Error subscribing to "+msgSub.getTopic()
                +" Error message: ", ex );
    }
}


class MyClass extends MessageListener
{ 
    Destination dest;
    public MyClass(Destination destToUse)
    {
        dest = destToUse;
    }

    @Override
    public void onMessage( final Message message )
    {
        //note here we are using the same topic
    MessageListener msgSub = new MyMessageListener("Some.Topic");

        MessageConsumer consumer;
        try
        {           
            //note here we are using the same dest as above
            consumer = session.createConsumer(dest);

            msgSub.setConsumer( consumer );

            if(msgSub.getMessageListener() != null)
            {
                    //this is where it deadlocks subscribing while consuming a message
                    //on the same topic/destination
                consumer.setMessageListener( msgSub );
            }
        }
        catch (Exception ex)
        {
            LOGGER.error( "Error subscribing to "+msgSub.getTopic()
                    +" Error message: ", ex );
        }
    }
}

基本上,我在 ActiveMQ 线程内的消费者中订阅。可以在这里找到对这个问题的唯一参考:https ://issues.apache.org/jira/browse/AMQ-336但我不知道解决方法是什么。现在我只是将订阅调用放在工作线程上并将控制权返回给 ActiveMQ,但似乎有更好的解决方案。是我在处理同一主题/目标的消息时订阅主题/目标的问题吗?

4

0 回答 0