我正在通过 STOMP 与 ActiveMQ 交互。我有一个发布消息的进程和一个订阅和处理消息的多个进程(大约 10 个并行实例)。
阅读消息后,我想确定如果由于某种原因我的应用程序失败/崩溃,消息不会丢失。所以很自然地,我转向交易。不幸的是,我发现一旦消费者在事务中读取一条消息,在事务结束之前,所有后续消息都不会发送给其他消费者。
测试用例:abc
队列有 100 条消息。如果我在两个不同的浏览器选项卡中激活以下代码,第一个将在 10 秒内返回,第二个将在 20 秒内返回。
<?php
// Reader.php
$con = new Stomp("tcp://localhost:61613");
$con->connect();
$con->subscribe(
"/queue/abc",
array()
);
$tx = "tx3".microtime();
echo "TX:$tx<BR>";
$con->begin($tx);
$messages = array();
for ($i = 0; $i < 10; $i++) {
$t = microtime(true);
$msg = $con->readFrame();
if (!$msg) {
die("FAILED!");
}
$t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>";
array_push($messages, $msg);
$con->ack($msg, $tx);
sleep(1);
}
$con->abort($tx);
有什么我在代码方面缺少的东西吗?有没有办法配置 ActiveMQ(或发送标头),使事务从队列中删除项目,允许其他进程使用其他消息,如果事务失败或超时,会将项目放回?
PS:我考虑过为每个阅读过程创建另一个队列 - DetentionQueue,但如果我有选择的话,我真的不想这样做。