1

我需要使用 PHP 在 ActiveMQ 队列中查找特定消息并将其删除。

AFAIK 这样做的唯一方法是读取当前排队的所有消息并确认我感兴趣的一条消息。(Stomp::ack 的 PHP 手册中的示例或多或少做同样的事情,他们没有t 读取所有消息,但只确认匹配的消息)。

所以,我写了这段代码(显然,这只是相关部分):

class StompController {

    private $con;

    public function __construct($stompSettings) {
        try {
            $this->con = new Stomp($stompSettings['scheme']."://".$stompSettings['host'].":".$stompSettings['port']);
            $this->con->connect();
            $this->con->setReadTimeout(5);
        } catch(StompException $e) {
            die('Connection failed:' .$e->getMessage());
        }
    }

    public function __destruct() {
        $this->con->disconnect();
    }

    public function ackMessageAsRead($recipient,$message) {
        if($this->con->isConnected()) {
            //Subscribe to the recipient user's message queue.
            $this->con->subscribe("/queue/".$recipient);
            //Read all messages currently in the queue (but only ACK the one we're interested in).
            while($this->con->hasFrameToRead()) {
                $msg = $this->con->readFrame();
                if($msg != null && $msg != false) {
                    //This is the message we are currently reading, ACK it to AMQ and be done with it.
                    if($msg->body == $message) {
                        $this->con->ack($msg);
                    }
                } 
            }
        } else {
            return false;
        }
    }
}

根据我的逻辑,这应该可行。在运行代码时,尽管检查了更多帧,但仅读取了一条随机消息。

下一帧似乎只有在我们当前正在读取的帧已被确认时才准备好。(当我手动确认所有消息时,while循环按预期工作,所有消息都被处理。

有谁知道如何在不确认所有消息的情况下从队列中获取完整的消息集?我可以确认所有这些,然后将我不感兴趣的那些放回队列中,但是这种查找单个消息的效率已经很低的方式变得更加低效。

4

1 回答 1

2

我认为这是由于设置activemq.prefetchSize为 1 造成的问题。ActiveMQ 使用预取大小来确定在任何时间点可以将多少条消息分派给消费者。一旦达到预取大小,在消费者开始发回确认之前,不会再向消费者发送消息。据我所知,增加预取大小应该可以解决您的问题。

有关预取限制的更多详细信息,请阅读http://activemq.apache.org/what-is-the-prefetch-limit-for.html

于 2012-09-13T06:15:58.063 回答