3

我正在实施 Oracle Advanced Queue 并且对它完全陌生。我对此有一些疑问。下面是我的代码:

package com;

/* Set up main class from which we will call subsequent examples and handle 
 exceptions: */
import java.sql.*;

import oracle.AQ.*;

public class test_aqjava
{
    public static void main(String args[]) 
    {
        AQSession  aq_sess = null;
        try 
        {
            aq_sess = createSession(args);
            createAqTables(aq_sess);
            enqueueMsg(aq_sess);
            // dequeueMsg(aq_sess);
            aq_sess.close();

            /* now run the test: */
            // runTest(aq_sess);     
        }
        catch (Exception ex)
        {
            System.out.println("Exception-1: " + ex); 
            ex.printStackTrace();      
        }  
    }

    public static AQSession createSession(String args[]) 
    {
        Connection db_conn;
        AQSession  aq_sess = null;

        try 
        {

            Class.forName("oracle.jdbc.driver.OracleDriver");
            /* your actual hostname, port number, and SID will 
    vary from what follows. Here we use 'dlsun736,' '5521,'
    and 'test,' respectively: */

            db_conn =
                    DriverManager.getConnection(
                            "jdbc:oracle:thin:@hostname.com:1521:sid", 
                            "USER", "USER");

            System.out.println("JDBC Connection opened "); 
            db_conn.setAutoCommit(false);

            /* Load the Oracle8i AQ driver: */
            Class.forName("oracle.AQ.AQOracleDriver");

            /* Creating an AQ Session: */
            aq_sess = AQDriverManager.createAQSession(db_conn);
            System.out.println("Successfully created AQSession ");  
        }
        catch (Exception ex)
        {
            System.out.println("Exception: " + ex); 
            ex.printStackTrace();      
        }  
        return aq_sess;
    }

    public static void createAqTables(AQSession aq_sess) throws AQException
    {
        AQQueueTableProperty     qtable_prop;
        AQQueueProperty          queue_prop;
        AQQueueTable             q_table;
        AQQueue                  queue;


        /* Creating a AQQueueTableProperty object (payload type - RAW): */
        qtable_prop = new AQQueueTableProperty("RAW"); 

        /* Creating a queue table called aq_table1 in aqjava schema: */
        q_table = aq_sess.createQueueTable ("USER", "aq_table1", qtable_prop);
        System.out.println("Successfully created aq_table1 in aqjava schema");  

        /* Creating a new AQQueueProperty object */
        queue_prop = new AQQueueProperty();

        /* Creating a queue called aq_queue1 in aq_table1: */
        queue = aq_sess.createQueue (q_table, "aq_queue1", queue_prop);
        System.out.println("Successfully created aq_queue1 in aq_table1");  

        /* Enable enqueue/dequeue on this queue: */
        queue.start();
        System.out.println("Successful start queue");  
    }

    public static void enqueueMsg(AQSession aq_sess) throws AQException
    {
        AQQueueTable             q_table;
        AQQueue                  queue;
        AQMessage                message;
        AQRawPayload             raw_payload;
        AQEnqueueOption          enq_option;
        String                   test_data = "new message";
        byte[]                   b_array;
        Connection               db_conn;

        db_conn = ((AQOracleSession)aq_sess).getDBConnection();

        /* Get a handle to queue table - aq_table4 in aqjava schema: */
        q_table = aq_sess.getQueueTable ("USER", "aq_table1");
        System.out.println("Successful getQueueTable");  

        /* Get a handle to a queue - aq_queue4 in aquser schema: */
        queue = aq_sess.getQueue ("USER", "aq_queue1");
        System.out.println("Successful getQueue");  

        /* Creating a message to contain raw payload: */
        message = queue.createMessage();

        /* Get handle to the AQRawPayload object and populate it with raw data: */
        b_array = test_data.getBytes();

        raw_payload = message.getRawPayload();

        raw_payload.setStream(b_array, b_array.length);

        /* Creating a AQEnqueueOption object with default options: */
        enq_option = new AQEnqueueOption();
        /* Enqueue the message: */
        queue.enqueue(enq_option, message);

        try {
            db_conn.commit();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static void dequeueMsg(AQSession aq_sess) throws AQException
    {
        AQQueueTable             q_table;
        AQQueue                  queue;
        AQMessage                message;
        AQRawPayload             raw_payload;
        AQDequeueOption          deq_option;
        byte[]                   b_array;
        Connection               db_conn;

        db_conn = ((AQOracleSession)aq_sess).getDBConnection();

        /* Get a handle to queue table - aq_table4 in aqjava schema: */
        q_table = aq_sess.getQueueTable ("USER", "aq_table1");
        System.out.println("Successful getQueueTable");  

        /* Get a handle to a queue - aq_queue4 in aquser schema: */
        queue = aq_sess.getQueue ("USER", "aq_queue1");
        System.out.println("Successful getQueue");  

        /* Creating a AQDequeueOption object with default options: */
        deq_option = new AQDequeueOption();
        /* Enqueue the message: */
        message = queue.dequeue(deq_option);
        raw_payload = message.getRawPayload();
        b_array=  raw_payload.getBytes();
        String msg = new String(b_array);
        System.out.println("Dequeue Msg "+msg);

        try {
            db_conn.commit();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

我创建了一个队列表和一个队列。消息已从队列中写入和读取。

Q1。我可以在同一个队列中再写一条消息并从中读取吗?如果是,我们该怎么做?因为我尝试将消息写入同一个队列但不能

Q2。如何将上面的代码转换为发布订阅?如何通过多次阅读相同的消息来测试它?

任何帮助表示赞赏。

4

2 回答 2

3

1)登录到oracle db并创建用户。

CREATE USER jmsuser IDENTIFIED BY a;
GRANT DBA, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to jmsuser;
GRANT EXECUTE ON DBMS_AQADM TO jmsuser;
GRANT EXECUTE ON DBMS_AQ TO jmsuser;
GRANT EXECUTE ON DBMS_LOB TO jmsuser;
GRANT EXECUTE ON DBMS_JMS_PLSQL TO jmsuser;

2)类创建多消费者队列并为队列注册两个订阅者。(ConnectionDefinition.getOracleConnection() 返回到 oracle 的常规 jdbc 连接)

import java.sql.Connection;
import oracle.AQ.AQAgent;
import oracle.AQ.AQDriverManager;
import oracle.AQ.AQQueue;
import oracle.AQ.AQQueueProperty;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.AQ.AQSession;

/**
 *
 * @author alukasiewicz
 */
public class NewClass {

    public static void main(String[] args) throws Exception {
        Class.forName("oracle.AQ.AQOracleDriver");
        Connection con = ConnectionDefinition.getOracleConnection();
        AQSession aq_sess = AQDriverManager.createAQSession(con);
        AQQueueTableProperty qtable_prop;
        AQQueueProperty queue_prop;
        AQQueueTable q_table;
        AQQueue queue;
        AQAgent subs1, subs2;
        qtable_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
        qtable_prop.setMultiConsumer(true);
        q_table = aq_sess.createQueueTable("jmsuser", "aq_table5", qtable_prop);
        queue_prop = new AQQueueProperty();
        queue = aq_sess.createQueue(q_table, "aq_queue5", queue_prop);
        System.out.println("Successful createQueue");
        System.out.println("Successful start queue");
        subs1 = new AQAgent("GREEN", "", 0);
        subs2 = new AQAgent("BLUE", "", 0);
        queue.addSubscriber(subs2, null);
        queue.addSubscriber(subs1, null);
        queue.start();
    }
}

3) 类将消息发布到队列。

    public class Publisher {

        public static void main(String[] args) throws Exception {
            Class.forName("oracle.AQ.AQOracleDriver");
            Connection con = ConnectionDefinition.getOracleConnection();
            TopicConnection tc_conn =AQjmsTopicConnectionFactory.createTopicConnection(con);
            tc_conn.start();
            TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
            Topic queueTopic= ((AQjmsSession )jms_sess).getTopic("JMSUSER","AQ_QUEUE5");
            AQjmsTopicPublisher publisherAq = (AQjmsTopicPublisher)jms_sess.createPublisher(queueTopic);
            BytesMessage messAll = jms_sess.createBytesMessage();
            BytesMessage messOnlyForGreen = jms_sess.createBytesMessage();
            messAll.writeUTF("Message for all subscribers");
            messOnlyForGreen.writeUTF("Message only for green");
            publisherAq.publish(messAll);
            publisherAq.publish(messOnlyForGreen, new AQjmsAgent[]{new AQjmsAgent("GREEN", null)} );
            con.commit();
            tc_conn.close();
            con.close();                
        } 
    }

在 oracle 中,您可以在队列中查看这些消息。两个代表绿色,一个代表红色。

    SELECT a.queue,  a.msg_state, a.consumer_name FROM jmsuser.aq$aq_table5 a

4) 类从队列中读取消息;

public class Subscriber {

    public static void main(String[] args) throws Exception {
        Class.forName("oracle.AQ.AQOracleDriver");
        Connection con = ConnectionDefinition.getOracleConnection();
        TopicConnection tc_conn = AQjmsTopicConnectionFactory.createTopicConnection(con);
        TopicSession jms_sess = tc_conn.createTopicSession(true, Session.SESSION_TRANSACTED);
        tc_conn.start();
        Topic queueTopic = ((AQjmsSession) jms_sess).getTopic("jmsuser", "AQ_QUEUE5");
        TopicSubscriber subGreen =  (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "GREEN");
        TopicSubscriber subRed =  (TopicSubscriber)((AQjmsSession) jms_sess).createDurableSubscriber(queueTopic, "RED");        
        Message msg = subGreen.receive(10);
        System.err.println("Start receiving message for green subscriber");
        while(msg != null){
              System.err.println("     GREEN recive message "+ ((BytesMessage)msg).readUTF());
              msg = subGreen.receive(10); // receive with timeout;
        }
        System.err.println("End receiving message for green subscriber");
        System.err.println("  ");        
        System.err.println("Start receiving message for red subscriber");
        BytesMessage byteMsg = (BytesMessage)msg;
        msg = subRed.receive(10);
        while(msg != null){
              System.err.println("     RED recive message "+ ((BytesMessage)msg).readUTF());
              msg = subRed.receive(10); // receive with timeout;
        }
           System.err.println("End receiving message for red subscriber");
        con.commit();
        tc_conn.close();
        con.close();

    }
}

5) Pom 依赖

 <dependencies>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>11.2.0.4</version>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>aqapi</artifactId>
            <version>13</version>
        </dependency>
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>jms</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>jta</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>orai18n</artifactId>
            <version>11.2.0.4</version>
        </dependency>
    </dependencies>
于 2015-07-17T09:29:20.707 回答
-1

您的队列是具有两个注册订阅者(“SUB1”,“SUB2”)的多消费者队列。
1) 在不指定订阅者的情况下将消息排入队列。
2) 消息将为所有订阅者复制。可以通过从select consumer_name from aq$queue_table_name; 查询中选择来检查这一点,应该返回 SUB1 和 SUB2。
3)您不需要通知订阅者。所有订阅者都在队列中收听,他们正在等待他们的消息
Oracle 有非常有用的手册Streams Advanced Queuing User's Guide

于 2015-07-15T09:43:22.417 回答