5

我有一个队列类型为 SYS.AQ$_JMS_TEXT_MESSAGE 的 Oracle AQ。我想要做的是从java应用程序将文本插入到提到的队列中。

等效的 SQL 查询是

declare
 r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 v_message_handle     RAW(16);
 o_payload            SYS.AQ$_JMS_TEXT_MESSAGE;
begin
 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());
 sys.dbms_aq.enqueue (
   queue_name         => 'QUEUE_NAME',
   enqueue_options    => r_enqueue_options,
   message_properties => r_message_properties,
   payload            => o_payload,
   msgid              => v_message_handle
 );
 commit;
end;
/

我使用本指南得到了大部分内容,但我被困在

 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());

该指南显示了如何将 RAW 消息排入队列,但我需要它是 JMS,否则数据类型与队列类型不匹配。

任何帮助将不胜感激,因为即使使用全能的谷歌我也无法找到解决这个问题的方法。有没有办法使用oracle.jdbc.aq类来做到这一点,还是我只需要吸收它并使用 SQL 查询?

4

2 回答 2

11

只需复制粘贴此代码并尝试。(如果它适合你)然后仔细阅读代码,并理解。

执行时,

  • 首先取消注释 main 方法中的“ createQueue() ”行。

在那之后,

  • 评论它并取消评论' sendMessage() '行并尝试发送您的消息。

然后分别注释/取消注释每一行并尝试一下。

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;

import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class OracleAQClient {

public static QueueConnection getConnection() {

    String hostname = "localhost";
    String oracle_sid = "xe";
    int portno = 1521;
    String userName = "jmsuser";
    String password = "jmsuser";
    String driver = "thin";
    QueueConnectionFactory QFac = null;
    QueueConnection QCon = null;
    try {
        // get connection factory , not going through JNDI here
        QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
        // create connection
        QCon = QFac.createQueueConnection(userName, password);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return QCon;
}

public static void createQueue(String user, String qTable, String queueName) {
    try {
        /* Create Queue Tables */
        System.out.println("Creating Queue Table...");
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        AQQueueTableProperty qt_prop;
        AQQueueTable q_table = null;
        AQjmsDestinationProperty dest_prop;
        Queue queue = null;
        qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");

        q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);

        System.out.println("Qtable created");
        dest_prop = new AQjmsDestinationProperty();
        /* create a queue */
        queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
        System.out.println("Queue created");
        /* start the queue */
        ((AQjmsDestination) queue).start(session, true, true);

    } catch (Exception e) {
        e.printStackTrace();
        return;
    }
}

public static void sendMessage(String user, String queueName,String message) {

    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        QCon.start();
        Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
        MessageProducer producer = session.createProducer(queue);
        TextMessage tMsg = session.createTextMessage(message);

        //set properties to msg since axis2 needs this parameters to find the operation
        tMsg.setStringProperty("SOAPAction", "getQuote");
        producer.send(tMsg);
        System.out.println("Sent message = " + tMsg.getText());

        session.close();
        producer.close();
        QCon.close();

    } catch (JMSException e) {
        e.printStackTrace();
        return;
    }
}

public static void browseMessage(String user, String queueName) {
    Queue queue;
    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        QCon.start();
        queue = ((AQjmsSession) session).getQueue(user, queueName);
        QueueBrowser browser = session.createBrowser(queue);
        Enumeration enu = browser.getEnumeration();
        List list = new ArrayList();
        while (enu.hasMoreElements()) {
            TextMessage message = (TextMessage) enu.nextElement();
            list.add(message.getText());
        }
        for (int i = 0; i < list.size(); i++) {
            System.out.println("Browsed msg " + list.get(i));
        }
        browser.close();
        session.close();
        QCon.close();

    } catch (JMSException e) {
        e.printStackTrace();
    }

}

public static void consumeMessage(String user, String queueName) {
    Queue queue;
    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        QCon.start();
        queue = ((AQjmsSession) session).getQueue(user, queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        TextMessage msg = (TextMessage) consumer.receive();
        System.out.println("MESSAGE RECEIVED " + msg.getText());

        consumer.close();
        session.close();
        QCon.close();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

public static void main(String args[]) {
    String userName = "jmsuser";
    String queue = "sample_aq";
    String qTable = "sample_aqtbl";
    //createQueue(userName, qTable, queue);
    //sendMessage(userName, queue,"<user>text</user>");
    //browseMessage(userName, queue);
    //consumeMessage(userName, queue);
}

}

您需要将这些 jars/libs 从您的 oracle DB 安装目录复制到您的 java 项目中

  1. ojdbc6.jar
  2. jta.jar
  3. jmscommon.jar
  4. aqapi.jar

本文的学分应归功于 Ratha [1]。需要修改的东西很少,我只是修改了这些并提供了代码。

[1] http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/

谢谢

于 2014-01-28T07:53:05.393 回答
0

我将在@Chathura Kulasinghe 的答案中添加一些花絮。

首先,在 consumeMessage 方法中,使用

Session.CLIENT_ACKNOWLEDGE

创建会话对象的参数将具有将您消费的消息留在队列中的效果。如果你多次运行这个程序,你会看到队列的数据库表中消息的数量在增加。要删除消息,您需要通过在消息对象上调用此方法来“确认”它:

msg.acknowledge();

其次,如果您希望会话为您执行此操作,只需将客户端确认模式更改为:

Session.AUTO_ACKNOWLEDGE

使用此参数,每次调用您的 consumer.receive() 时,它都会自动确认,因此会从队列中删除。

于 2018-11-21T02:50:08.847 回答