0

概括

我正在尝试使用我的消费者设置心跳,以便它可以检测到连接断开 - 例如,如果代理重新启动,它可能会重新连接到故障转移。

我使用在本地运行的 Consumer/Dispatcher 和在 AWS 中运行的队列测试了代码,一切正常。但是,当将代码移动到 AWS 时,消费者会与服务器/代理设置心跳,但心跳要么永远不会由服务器/代理发送,要么永远不会被客户端/消费者接收。结果HeartbeatException,一旦请求的服务器心跳间隔过去,就会抛出。

我的代码基于 stomp-php-examples github 中的示例

我对为什么这不起作用的下一个最佳猜测与队列配置有关,因为我使用的是 AWS 提供的默认配置(我认为)。我已经用谷歌搜索并搜索了关于心跳的配置设置,但还没有走得太远,因为这是一个新话题。任何帮助,将不胜感激!

设置

  • 亚马逊 MQ (ActiveMQ 5.15.14)
  • stomp-php 5.0(截至今天的最新版本)

我很高兴提供有关我的设置的更多详细信息。

代码

消费者(精简版)

abstract class AmqConsumerAbstract
{
    /** @var StatefulStomp */
    protected $consumer;

    const HEARTBEAT = 5000;

    public function listen(): void
    {
        $observer = new ServerAliveObserver();
        $this->client->getConnection()->getObservers()->addObserver($observer);
        $this->client->setHeartbeat(0, self::HEARTBEAT); // heartbeats setup here
        // Note: the heartbeat above is longer than the read-timeout below

        $this->client->getConnection()->setReadTimeout(2, 0);

        $this->client->connect();

        $this->consumer = new StatefulStomp($this->client);
        $this->consumer->subscribe(
            $this->getQueueName(),
            null,
            'client'
        );

        if (!$observer->isEnabled()) {
            // we never get here so I assume everything is working OK
            echo 'The Server is not supporting heartbeats.');
            exit(1);
        } else {
            echo sprintf('The Server should send us signals every %d ms.', $observer->getInterval() * 1000);
        }

        try {
            while (true) {
                $frame = $this->consumer->read(); // I assumed this line would read the heartbeat?

                // there is then some logic that deals with payload and does
                // $this->consumer->begin();
                // $this->consumer->ack($frame);
                // $this->consumer->commit();

                if ($observer->isDelayed()) {
                    $this->echoLog('ServerAliveObserver: Server has been delayed.');
                }
            }
        } catch (HeartbeatException $e) {
            echo 'AMQ (STOMP) Error, the server failed to send us heartbeats within the defined interval: ' . $e->getMessage()
            $this->reconnect();
        } catch (ConnectionException $e) {
            echo $e->getMessage();
            $this->reconnect();
        } catch (Exception $e) {
            echo 'AMQ (STOMP) Queue error: ' . $e->getMessage();
            exit(1);
        }
    }
}

我已经在 5.15.14 版本的单实例代理上进行了复制。

我已经尝试为代理打开 STOMP 调试,但似乎不允许使用uri下面的属性。<transportConnector>配置将保存但拉出uri属性。

<transportConnector>Amazon MQ 配置中允许的元素及其属性

使用 Spring XML 配置文件(xsd 文件)

4

0 回答 0