嗨,有人知道如何使用 IBM MQ 创建消息侦听器吗?我知道如何使用 JMS 规范来做到这一点,但我不确定如何为 IBM MQ 做到这一点。非常感谢任何链接或指针。
7 回答
尽管前面的响应者提到了 WMQ Java API,但 WMQ 也支持 JMS,因此这里有一些资源可以帮助您入门。
看看这篇文章:IBM WebSphere Developer Technical Journal:在 WebSphere MQ V6.0 上运行独立的 Java 应用程序
此外,如果您安装了完整的 WMQ 客户端,而不仅仅是获取 jar,那么您将安装大量示例代码。默认情况下,这些将位于 C:\Program Files\IBM\WebSphere MQ\tools\jms 或 /opt/mqm/samp 中,具体取决于您的平台。
如果您需要 WMQ 客户端安装媒体,请在此处获取。请注意,这是 WMQ v7 客户端,而不是 v6 客户端。它与 v6 QMgr 兼容,但由于 v6 已于 2011 年 9 月结束生命周期,因此您应该在 v7 客户端上进行新的开发,如果可能的话,在 v7 QMgr 上进行。如果双方都是 v7,则有很多可用的功能和性能增强。
如果需要,您可以在此处获取产品手册。
最后,请确保在收到 JMS 异常时打印链接的异常。这不是 WMQ 的事情,而是 JMS 的事情。Sun 为 JMS 异常提供了多级数据结构,真正有趣的部分通常位于嵌套级别。这没什么大不了的,可以用几行代码实现:
try {
.
. code that might throw a JMSException
.
} catch (JMSException je) {
System.err.println("caught "+je);
Exception e = je.getLinkedException();
if (e != null) {
System.err.println("linked exception: "+e);
} else {
System.err.println("No linked exception found.");
}
}
这有助于确定 JMS 错误与传输错误之间的区别。例如,JMS 安全错误可能是 WMQ 2035,或者可能是 JSSE 配置,或者应用程序可能无法访问文件系统中的某些内容。其中只有一个值得花费大量时间挖掘 WMQ 错误日志,并且只有通过打印链接的异常,您才能判断它是否是那个。
查看 IBM 帮助:编写 WebSphere MQ 基础 Java 应用程序
IBM 有一个用于与队列交互的 API。这是他们的样本:
import com.ibm.mq.*; // Include the WebSphere MQ classes for Java package
public class MQSample
{
private String qManager = "your_Q_manager"; // define name of queue
// manager to connect to.
private MQQueueManager qMgr; // define a queue manager
// object
public static void main(String args[]) {
new MQSample();
}
public MQSample() {
try {
// Create a connection to the queue manager
qMgr = new MQQueueManager(qManager);
// Set up the options on the queue we wish to open...
// Note. All WebSphere MQ Options are prefixed with MQC in Java.
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
MQC.MQOO_OUTPUT ;
// Now specify the queue that we wish to open,
// and the open options...
MQQueue system_default_local_queue =
qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",
openOptions);
// Define a simple WebSphere MQ message, and write some text in UTF format..
MQMessage hello_world = new MQMessage();
hello_world.writeUTF("Hello World!");
// specify the message options...
MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults,
// same as MQPMO_DEFAULT
// put the message on the queue
system_default_local_queue.put(hello_world,pmo);
// get the message back again...
// First define a WebSphere MQ message buffer to receive the message into..
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.messageId = hello_world.messageId;
// Set the get message options...
MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults
// same as MQGMO_DEFAULT
// get the message off the queue...
system_default_local_queue.get(retrievedMessage, gmo);
// And prove we have the message by displaying the UTF message text
String msgText = retrievedMessage.readUTF();
System.out.println("The message is: " + msgText);
// Close the queue...
system_default_local_queue.close();
// Disconnect from the queue manager
qMgr.disconnect();
}
// If an error has occurred in the above, try to identify what went wrong
// Was it a WebSphere MQ error?
catch (MQException ex)
{
System.out.println("A WebSphere MQ error occurred : Completion code " +
ex.completionCode + " Reason code " + ex.reasonCode);
}
// Was it a Java buffer space error?
catch (java.io.IOException ex)
{
System.out.println("An error occurred whilst writing to the message buffer: " + ex);
}
}
} // end of sample
我不确定 IBM jars 是否位于基础 Maven 存储库中。我知道过去我不得不从本地 IBM 安装中提取它们并将它们放入本地 SVN 存储库中。我正在使用以下罐子:
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mq</artifactId>
<version>5.3.00</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mq.pcf</artifactId>
<version>5.3.00</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mqbind</artifactId>
<version>5.3.00</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mqjms</artifactId>
<version>5.3.00</version>
<scope>compile</scope>
</dependency>
在获取消息之前的循环中,您可以指定如下
gmo.options = MQC.MQGMO_WAIT
gmo.waitInterval = MQConstants.MQWI_UNLIMITED;
这使得循环将一直等待,直到队列中有消息。对我来说,它类似于MessageListerner
看看上面提供的示例。
特别是在线路
MQGetMessageOptions gmo = new MQGetMessageOptions();
system_default_local_queue.get(retrievedMessage, gmo);
您可以将 get 配置为在引发 MQRC_NO_MSG_AVAILABLE 异常之前等待指定的时间。或者你可以永远等待。
gmo.waitInterval= qTimeout;
gmo.options = MQC.MQGMO_WAIT
因此,您可以创建一个线程来不断寻找新消息,然后将它们传递给处理程序。获取和放置不需要在同一个线程甚至应用程序中。
我希望这有助于回答你的问题。
以防万一有人会像我一样为 MQ 侦听器搜索堆栈溢出...由于 JMS 实现,这可能不是答案,但这就是我想要的。像这样的东西:
MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection();
MQQueueSession session = (MQQueueSession)conn.createSession(false, 1);
Queue queue = session.createQueue("QUEUE");
MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue);
receiver.setMessageListener(new YourListener());
conn.start();
YourListener 应该实现 MessageListener 接口,您将在 onMessage(Message msg) 方法中接收消息。
您好,这是使用 IBM MQ 的消息侦听器的工作示例。在这里,我还使用 spring 来创建 bean 等......
package queue.app;
import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
@Component
public class QueueConsumer implements MessageListener{
private Logger logger = Logger.getLogger(getClass());
MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
QueueConnection qc;
Queue queue;
QueueSession queueSession;
QueueReceiver qr;
@Value("${jms.hostName}")
String jmsHost;
@Value("${jms.port}")
String jmsPort;
@Value("${jms.queue.name}")
String QUEUE_NAME;
@Value("${jms.queueManager}")
String jmsQueueMgr;
@Value("${jms.username}")
String jmsUserName;
@Value("${jms.channel}")
String jmsChannel;
@PostConstruct
public void init() throws Exception{
qcf.setHostName (jmsHost);
qcf.setPort (Integer.parseInt(jmsPort));
qcf.setQueueManager (jmsQueueMgr);
qcf.setChannel (jmsChannel);
qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
qc = qcf.createQueueConnection ();
queue = new MQQueue(QUEUE_NAME);
qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
qr = queueSession.createReceiver(queue);
qr.setMessageListener(this);
qc.start();
}
@Override
public void onMessage(Message message) {
logger.info("Inside On Message...");
long t1 = System.currentTimeMillis();
logger.info("Message consumed at ...."+t1);
try{
if(message instanceof TextMessage) {
logger.info("String message recieved >> "+((TextMessage) message).getText());
}
}catch(Exception e){
e.printStackTrace();
}
}
}
以下是我的依赖项..
<dependency>
<groupId>com.sun.messaging.mq</groupId>
<artifactId>fscontext</artifactId>
<version>4.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>jms</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.17.RELEASE</version>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mq</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mq.jmqi</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.ibm</groupId>
<artifactId>com.ibm.mqjms</artifactId>
<version>1.0</version>
</dependency>
除了现有答案之外,还有一点很重要:JMS 提供MessageListener
了一个类,它允许您以异步回调的形式接收消息。
本机 API没有等效功能!您必须酌情反复调用get(...)
。