在 activemq 中,我正在做一些可能是错误的事情。我正在创建一个消费者并在使用消息时设置一个 messageListener (consumer.setMessageListener(MessageListener))。这会导致 activeMQ 死锁。
这是我正在做的代码示例:
class DriverClass extends MessageListener
{
Destination destination = session.createTopic("Some.Topic");
MessageListener msgSub = new MyClass(destination);
Session session = passedInSession;
MessageConsumer consumer;
try
{
consumer = session.createConsumer(destination);
msgSub.setConsumer( consumer );
if(msgSub.getMessageListener() != null)
{
consumer.setMessageListener( msgSub );
}
}
catch (Exception ex)
{
LOGGER.error( "Error subscribing to "+msgSub.getTopic()
+" Error message: ", ex );
}
}
class MyClass extends MessageListener
{
Destination dest;
public MyClass(Destination destToUse)
{
dest = destToUse;
}
@Override
public void onMessage( final Message message )
{
//note here we are using the same topic
MessageListener msgSub = new MyMessageListener("Some.Topic");
MessageConsumer consumer;
try
{
//note here we are using the same dest as above
consumer = session.createConsumer(dest);
msgSub.setConsumer( consumer );
if(msgSub.getMessageListener() != null)
{
//this is where it deadlocks subscribing while consuming a message
//on the same topic/destination
consumer.setMessageListener( msgSub );
}
}
catch (Exception ex)
{
LOGGER.error( "Error subscribing to "+msgSub.getTopic()
+" Error message: ", ex );
}
}
}
基本上,我在 ActiveMQ 线程内的消费者中订阅。可以在这里找到对这个问题的唯一参考:https ://issues.apache.org/jira/browse/AMQ-336但我不知道解决方法是什么。现在我只是将订阅调用放在工作线程上并将控制权返回给 ActiveMQ,但似乎有更好的解决方案。是我在处理同一主题/目标的消息时订阅主题/目标的问题吗?