0

尝试监听 Kafka 消费者并在事务中执行一些查询并存储到 JDBC 数据存储区。执行一个查询后,我收到连接关闭错误。如果删除了 transacted() ,则不会出现这种情况。但我的用例是在事务中使用来自 kafka 消费者的数据后执行多个查询,即执行并持久化数据库中的所有查询,或者如果一个异常发生则不执行。

StudentRouteBuilder.java-

@Service 公共类 StudentRouteBuilder 扩展 RouteBuilder {

@Override
public void configure() throws Exception {

    from("kafka:my-poc?brokers=localhost:9092")
            .transacted()
            .process(exchange -> {
                List<String> query = new ArrayList<>();
                        query.add("Insert into Student values (12, 'Robby', 5)");
                        //query.add("Insert into Student values (12, 'Alice', 5)");
                exchange.getIn().setBody(query);
            }).to("jdbc:dataSource")
        .log("body saved in both");

}

}

应用程序.yml -

spring: jackson: property-naming-strategy: SNAKE_CASE 数据源: url: jdbc:mysql://localhost:3306/test_db?autoReconnect=true 用户名: root 密码: Password@1 平台: mysql driver-class-name: com.mysql .cj.jdbc.Driver

kafka:服务器:XXXX bootstrap-servers-ttc-ssl:XXXX 主题:XXXX

错误跟踪 -

2021-05-25 23:13:01.096 WARN 20785 --- [onSumer [my-poc]] oacsspi.transactionerrorhandler:交易回滚(0x2842c098)重新载入(0x2842c098)for(forse)(forse)捕获:JDBC 提交失败;嵌套异常是 java.sql.SQLException: Connection is closed 2021-05-25 23:13:01.100 WARN 20785 --- [onsumer[my-poc]] oacamel.component.kafka.KafkaConsumer:处理过程中出错。交换[47559BD6C12528D-0000000000000000]。引起:[org.springframework.transaction.TransactionSystemException - JDBC提交失败;嵌套异常是 java.sql.SQLException: Connection is closed]

org.springframework.transaction.TransactionSystemException:JDBC 提交失败;嵌套异常是 java.sql.SQLException: Connection is closed at org.springframework.jdbc.datasource.DataSourceTransactionManager.translateException(DataSourceTransactionManager.java:435) ~[spring-jdbc-5.3.6.jar:5.3.6] at org. springframework.jdbc.support.JdbcTransactionManager.translateException(JdbcTransactionManager.java:188) ~[spring-jdbc-5.3.6.jar:5.3.6] at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:336 ) ~[spring-jdbc-5.3.6.jar:5.3.6] at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743) ~[spring-tx-5.3.6.jar:5.3. 6] 在 org.springframework.transaction.support.AbstractPlatformTransactionManager。

4

0 回答 0