1

我已经了解了如何使用 AQ (Streams?) 包创建 Oracle 数据库。我还在 Oracle 中创建了一些队列(手动)。(使用 PL/SQL 和 SQL)。

但是,我很难从 Spring 建立正确的连接。

以下作品(使用oracle.AQjava包):

private final String aqUrl = "jdbc:oracle:thin:@localhost:1521:orcl";
private final String aqUser = "queue_mut";
private final String aqPassword = "******";
private final String aqSchema = "queue_mut";
private final String aqTable = "aq_table1";
private final String aqQueue = "aq_queue1";


@Test
public void testManualAQ() throws ClassNotFoundException, SQLException, AQException {

    Class.forName("oracle.jdbc.driver.OracleDriver");
    Connection connection = DriverManager.getConnection(aqUrl, aqUser, aqPassword);
    connection.setAutoCommit(false);

    Class.forName("oracle.AQ.AQOracleDriver");
    AQSession aqSession = AQDriverManager.createAQSession(connection);
    AQQueueTable q_table = aqSession.createQueueTable(aqSchema, aqTable, new AQQueueTableProperty("RAW"));
    aqSession.createQueue(q_table, aqQueue, new AQQueueProperty());

}

(基于https://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm

这表明我可以连接到 Oracle 并使用 AQ 功能。

现在,我正在尝试创建 Java 配置的 bean,以便使用JmsTemplate.

@Resource
private JmsTemplate jmsTemplate;

@Test
public void testJmsTemplate() {
    String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
            "<product id=\"10\">\n" +
            " <description>Foo</description>\n" +
            " <price>2.05</price>\n" +
            "</product>";

    jmsTemplate.convertAndSend(aqSchema + ".jms_ws_incoming_queue", xmlval);
}

(是的,队列已经存在;-))

使用以下配置类:

import oracle.jms.AQjmsFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.sql.DataSource;

@Configuration
public class OracleAQConfiguration {

    @Bean
    public DataSourceTransactionManager transactionManager(DataSource dataSource) {
        DataSourceTransactionManager manager = new DataSourceTransactionManager();
        manager.setDataSource(dataSource);
        return manager;
    }

    @Bean
    public ConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
        return AQjmsFactory.getQueueConnectionFactory(dataSource);
    }

    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setConnectionFactory(connectionFactory);
        return jmsTemplate;
    }
}

并具有 yml 属性:

spring:
  datasource:
    url: jdbc:oracle:thin:@localhost:1521:orcl
    username: queue_mut
    password: ******
    driverClassName: oracle.jdbc.driver.OracleDriver

但是有了这个,我得到了我无法理解的错误:

2017-04-19 12:11:17,151  INFO my.project.QueueTest: Started QueueTest in 5.305 seconds (JVM running for 6.588)

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is oracle.jms.AQjmsException: Error creating the db_connection; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection

    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:570)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:658)
    at my.project.QueueTest.testJmsTemplate(QueueTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: oracle.jms.AQjmsException: Error creating the db_connection
    at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:625)
    at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:399)
    at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:249)
    at oracle.jms.AQjmsConnectionFactory.createConnection(AQjmsConnectionFactory.java:513)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:474)
    ... 36 more
Caused by: java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
    at oracle.jms.AQjmsGeneralDBConnection.getProviderKey(AQjmsGeneralDBConnection.java:98)
    at oracle.jms.AQjmsGeneralDBConnection.<init>(AQjmsGeneralDBConnection.java:67)
    at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:566)
    ... 41 more

我相信发生 Cast 异常是因为它是一个ProxyConnection[PooledConnection[oracle.jdbc.driver.T4CConnection@40016ce1]]. 但我不知道如何解决这个问题。

4

4 回答 4

3

更改jdbc库,在我的情况下,这已修复(如果没有,请尝试使用其他版本):

<dependency>
    <groupId>com.oracle</groupId>
    <artifactId>ojdbc7</artifactId>
    <version>12.1.0.2.0</version>
</dependency>
于 2018-02-14T16:15:26.353 回答
1

我们在尝试从 Spring Boot 访问 Oracle AQ 时遇到了同样的异常。研究表明,由于数据库连接池库不允许访问 oracle AQ 库所需的底层连接而引发此异常。(dbcp 和 tomcat 连接池库均抛出异常,不相同但相似)

当我们从依赖项中删除数据库连接池库时,这个异常就消失了,这会导致整个应用程序没有数据库连接池的不良状态。

我们注意到,如果我们使用以下方法,则不会抛出异常 AQjmsFactory.getQueueConnectionFactory(url, info);

也许解决方案也缺少连接池,但这仅限于从 AQ 读取的组件。应用程序中的其他组件将受益于连接池

这是 Bean 定义的 java 配置:

@Bean
public QueueConnectionFactory connectionFactory() throws Exception {
    OracleServiceInfo serviceInfo = (OracleServiceInfo) this.cloud().getServiceInfo(NAME_PRIMARY_DS);
    Properties info = new Properties();
    String url = serviceInfo.getJdbcUrl();
    info.put("driver-name", "oracle.jdbc.OracleDriver");
    info.put("user", serviceInfo.getUserName());
    info.put("password", serviceInfo.getPassword());
    return oracle.jms.AQjmsFactory.getQueueConnectionFactory(url, info);
}

@Bean
public JmsTemplate jmsTemplate() throws Exception {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(connectionFactory());
    return jmsTemplate;
}

我还不确定这是否是一个好的解决方案。但这绝对是摆脱问题中讨论的异常的一种方法。

于 2017-08-08T00:02:40.723 回答
0

嗨,我也花了相当长的时间才能使连接正常工作,但最终成功了,方法如下:

首先确保您的 Oracle AQ Queue 表的有效负载未设置为 RAW,但最好设置为 Text:SYS.AQ$_JMS_TEXT_MESSAGE

接下来使用类似于下面的 OracleAQConfiguration:

import oracle.jdbc.pool.OracleDataSource;
import oracle.jms.AQjmsFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.JMSException;
import javax.jms.QueueConnectionFactory;
import javax.sql.DataSource;
import java.sql.SQLException;

@Configuration
public class OracleAQConfiguration {

    // Values are retrieved from custom added props in Spring application.properties

    @Value("${myapplication.datasource.user}")
    private String user;

    @Value("${myapplication.datasource.password}")
    private String password;

    @Value("${myapplication.datasource.connectionstring}")
    private String connectionstring;

    @Bean
    /**
     * Spring bean with the configuration details of where the Oracle database is containing the QUEUES
     */
    public DataSource dataSource() throws SQLException {
        OracleDataSource ds = new OracleDataSource();
        ds.setUser(user);
        ds.setPassword(password);
        ds.setURL(connectionstring);
        ds.setImplicitCachingEnabled(true);
        ds.setFastConnectionFailoverEnabled(true);
        return ds;
    }

    @Bean
    /**
     * The KEY component effectively connecting to the Oracle AQ system using the datasource input
     */
    public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
        return AQjmsFactory.getQueueConnectionFactory(dataSource);
    }

}

接下来使用类似于下面的 JMSConfiguration。在这里,我读取和写入同一个队列,这在真实的应用程序集成场景中是不可能的。但可以测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import javax.jms.ConnectionFactory;
import javax.sql.DataSource;

@Configuration
public class JMSConfiguration {
    private static final String QUEUENAME_WRITE = "MYQUEUE";
    private static final String QUEUENAME_READ = "MYQUEUE";

    @Autowired
    private JMSReceiver jmsReceiver;

    @Bean
    /**
     * Spring bean to WRITE/SEND/ENQUEUE messages on a queue with a certain name
     */
    public JmsTemplate jmsTemplate(ConnectionFactory conFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setDefaultDestinationName(QUEUENAME_WRITE);
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setConnectionFactory(conFactory);

        return jmsTemplate;
    }

    /**
     * Spring bean to READ/RECEIVE/DEQUEUE messages of a queue with a certain name
     * All of this happens under a code managed transaction
     * to commit the change on Oracle (remove of the message from the queue table)
     * Reference the application custom code handling the message here
     */
    @Bean
    public DefaultMessageListenerContainer messageListenerContainer(ConnectionFactory conFactory, DataSource dataSource) {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setDestinationName(QUEUENAME_READ);
        dmlc.setSessionTransacted(true);
        dmlc.setConnectionFactory(conFactory);

        DataSourceTransactionManager manager = new DataSourceTransactionManager();
        manager.setDataSource(dataSource);
        dmlc.setTransactionManager(manager);

        // Add here our self-written JMS Receiver
        dmlc.setMessageListener(jmsReceiver);
        return dmlc;
    }

}

最后,为了处理传入的 JMS 消息,请使用以下内容:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

@Component
public class JMSReceiver implements SessionAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(JMSReceiver.class);

    @Override
    public void onMessage(Message message, Session session) throws JMSException {
        // We know/assume the Queue Payload type was set to 'TextMessage'
        TextMessage txtMessage = (TextMessage) message;
        logger.info("JMS Text Message received: " + txtMessage.getText());

        // ... further implementation
    }

}
于 2019-11-24T17:24:39.933 回答
0

问题是 AQ 代码需要一个 OracleConnection,但是当池化连接时,连接被包装,因此它失败

于 2018-12-18T06:09:17.583 回答