3

我在我的应用程序中执行以下流程:

  1. 从代理获取 1 条消息(手动确认)

  2. 做一些处理

  3. 在数据库和代理上启动事务

  4. 在数据库中插入一些记录并在代理上发布一些消息(不同的队列)

  5. 提交数据库和代理

  6. 确认您在步骤 1 中从代理获得的消息。

代理上的所有操作都是通过单个通道完成的。这是准备代码:

Connection brokerConnection = factory.newConnection();              
Channel channel = brokerConnection.createChannel();
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("receive-queue", false, consumer);

以下是我的代码。我已删除try,catch部分以使其清楚。我将所有异常记录到文件中。步骤1:

QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Request request = (Request) SerializationUtils.deserialize(delivery.getBody());

步骤 2、3、4、5:

dbConnection.setAutoCommit(false);
channel.txSelect();

stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);

dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);

第 6 步:

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

一次迭代后,我可以看到数据库和代理中的记录(意味着它在第 5 步之前工作正常)。问题是在第 6 步之后接收队列上的消息没有被删除,并且管理插件显示一条未确认的消息。此外,我在日志文件中没有看到任何异常。任何人都可以帮忙吗?

[更新1]

现在我创建一个发布渠道和另一个接收渠道。现在正在工作。那么如何使用单个通道进行接收和发布(带有事务)?我以前使用单一渠道进行接收和发布,但没有交易。

[更新2]

我在事务中移动了第 6 步,它现在正在工作。

dbConnection.setAutoCommit(false);
channel.txSelect();

stmt = dbConnection.prepareStatement(query);
/* set paramteres */
stmt.executeUpdate();
channel.basicPublish(/* exchange name */, "KEY", MessageProperties.PERSISTENT_BASIC, /* result */ result);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 

dbConnection.commit();
channel.txCommit();
dbConnection.setAutoCommit(true);

我有点困惑。我只希望发布部分在事务内部。

4

1 回答 1

6

您已将通道置于事务模式 - 并且 Acks 是事务性的东西。因此,您要么需要在单独的非事务性通道上消费和确认,要么只接受您的确认需要在 tx.commit 之前。

于 2013-08-19T07:59:48.320 回答