我成功地将消息发送到ReceiverQueue
本地 Jboss 服务器上的队列名称,如何检索我发送给它的消息,或者如何检查队列中是否有任何消息(如果有的话)。或者我能得到某种解释,什么是最好的方法。谢谢
一个有效的发送/接收教程也将被接受。任何能让我发送到队列并从该队列接收消息的东西都会得到接受的答案。
我正在使用弹簧。
我想要一个解决方案,它使用带有 bean 注入的应用程序上下文..
我成功地将消息发送到ReceiverQueue
本地 Jboss 服务器上的队列名称,如何检索我发送给它的消息,或者如何检查队列中是否有任何消息(如果有的话)。或者我能得到某种解释,什么是最好的方法。谢谢
一个有效的发送/接收教程也将被接受。任何能让我发送到队列并从该队列接收消息的东西都会得到接受的答案。
我正在使用弹簧。
我想要一个解决方案,它使用带有 bean 注入的应用程序上下文..
标准 JMS API 步骤:
1. 使用服务器的访问详细信息创建 javax.naming.Context
context = new InitialContext(environment)
2. 在上下文中查找 javax.jms.QueueConnectionFactory。工厂名称特定于 JMS 服务器
factory = (QueueConnectionFactory)context.lookup(factoryName)
3. 创建一个 javax.jms.QueueConnection
connection = factory.createQueueConnection(...)
4.创建一个javax.jms.QueueSession
session = connection.createQueueSession(...)
5. 在上下文中查找您的 javax.jms.Queue
queue = (Queue) context.lookup(qJndiName)
到现在和发送一样......
6. 创建一个带有会话的 javax.jms.QueueReceiver
receiver = session.createReceiver(queue)
7. JMS API 提供 2 种检索消息的方法: 7.a 使用其中一种方法
等待消息
7.b 在您的类中实现 javax.jms.MessageListener 并将其注册为侦听器receiver.receive()
receiver.setMessageListener(this)
onMessage()
每当有新消息到达时,JMS API 都会调用您的方法
8. 不要忘记启动监听器:
connection.start()
9. 关闭上下文(非常重要,当您从同一个程序访问多个 JMS 服务器时):
context.close()
以上是来自独立应用程序的典型解决方案。在 EJB 环境中,您应该使用消息驱动的 bean。您可以在http://java.sun.com/javaee/6/docs/tutorial/doc/gipko.html和http://schuchert.wikispaces.com/EJB3+Tutorial+5+上找到有关它们的 ino -+消息+驱动+豆
这是您要求的工作示例:
import java.util.Hashtable;
import javax.naming.*;
import javax.jms.*;
public class JMSJNDISample implements MessageListener {
public static final String JNDI_URL = "jnp://localhost:1099";
public static final String JNDI_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
public static final String JMS_USER = null;
public static final String JMS_PASSWORD = null;
public static final String JMS_CONNECTION_FACTORY = "MyConnectionFactory";
public static final String QUEUE_JNDI_NAME = "ReceiverQueue";
QueueConnection qConn = null;
QueueSession qSession = null;
QueueSender qSender = null;
QueueReceiver qReceiver = null;
public JMSJNDISample () {
}
public void init() throws JMSException, NamingException {
// Set up JNDI Context
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_CONTEXT_FACTORY);
env.put(Context.PROVIDER_URL, JNDI_URL);
if (JMS_USER != null)
env.put(Context.SECURITY_PRINCIPAL, JMS_USER);
if (JMS_PASSWORD != null)
env.put(Context.SECURITY_CREDENTIALS, JMS_PASSWORD);
Context jndiContext = new InitialContext(env);
// Lookup queue connection factory
QueueConnectionFactory cFactory = (QueueConnectionFactory)jndiContext.lookup(JMS_CONNECTION_FACTORY);
// Create Connection
if (JMS_USER == null || JMS_PASSWORD == null)
qConn = cFactory.createQueueConnection();
else {
qConn = cFactory.createQueueConnection(JMS_USER, JMS_PASSWORD);
}
// Create Session
qSession = qConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// Lookup Queue
Queue queue = (Queue) jndiContext.lookup(QUEUE_JNDI_NAME);
// Create Queue Sender
qSender = qSession.createSender(queue);
// Create Queue Receiver
qReceiver = qSession.createReceiver(queue);
qReceiver.setMessageListener(this);
// Start receiving messages
qConn.start();
// Close JNDI context
jndiContext.close();
}
public void sendMessage (String str) throws JMSException {
TextMessage msg = qSession.createTextMessage(str);
qSender.send(msg);
}
public void onMessage (Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
System.out.println("Text Message Received: "+textMessage.getText());
} else {
System.out.println(message.getJMSType()+" Message Received");
}
} catch (JMSException je) {
je.printStackTrace();
}
}
public void destroy() throws JMSException {
if (qSender != null) qSender.close();
if (qReceiver != null) qReceiver.close();
if (qSession != null) qSession.close();
if (qConn != null) qConn.close();
}
public static void main(String args[]) {
try {
JMSJNDISample sample = new JMSJNDISample();
// Initialize connetion
sample.init();
// Send Message
sample.sendMessage("Hello World");
// Wait 2 sec for answer
Thread.sleep(2000);
// Disconnect
sample.destroy();
} catch (Exception e) {
e.printStackTrace();
}
}
}
除了让 MessageDrivenBean 监听该队列之外?
编辑:您使用 spring 只是为了创建有效负载,对吗?JMS 是 JavaEE 规范。您不需要使用 Spring 来实际发送/接收消息。您也不必手动检查队列中是否有消息等。您需要做的就是像这样设置一个 MDB(MessageDrivenBean),
@MessageDriven(activationConfig = {
@ActivationConfigProperty(
propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(
propertyName = "destination", propertyValue = "queue/myqueue")
})
public class MyMessageDrivenBean implements MessageListener {
public void onMessage(Message message) {
ObjectMessage objMsg = (ObjectMessage) message;
Payload payload = (Payload)objMsg.getObject();
//do stuff
}
}
然后发送一些 JMS 消息。
@Stateless
public class QueuerBean implements QueuerLocal {
@Resource(mappedName = "java:/JmsXA")
private ConnectionFactory jmsConnectionFactory;
@Resource(mappedName = "queue/myqueue")
private Queue queue;
private void queue(MyPayload payload) {
try {
Connection connect = jmsConnectionFactory.createConnection();
Session session = connect.createSession(false,
Session.DUPS_OK_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
// create a JMS message and send it
ObjectMessage objMsg = session.createObjectMessage(payload);
producer.send(objMsg);
producer.close();
session.close();
connect.close();
} catch (JMSException e) {
log.error("Bad thing happened", e);
}
}
}
队列由注解配置。当有消息发送时,JBoss 会自动触发 MDB。
这是一个示例,展示了如何在 Spring 中设置消息驱动的 POJO。如果您已经在使用 Spring,我建议您遵循这个习惯用法。
至于查看队列中有多少消息的部分,我想说您应该使用 JBOSS 的管理控制台,而不是您的代码。
我还建议使用 HermesJMS(http://www.hermesjms.com/confluence/display/HJMS/Home)之类的工具来检查队列管理器和队列。这是一个很棒的调试工具。