4

I am processing a huge xml document (which contains around a million entries) and subsequently importing a formatted version to the db using rabbitmq. Each time after publishing around 200,000 entries I receive a broken pipe error , and rabbitmq is unable to recover from it.

Notice Error: fwrite(): send of 2651 bytes failed with errno=11 Resource temporarily unavailable in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 33 bytes failed with errno=104 Connection reset by peer in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

This subsequently causes a node down error and the process needs to be manually killed to recover from it.

These are my class methods:-

public function publishMessage($message) {
    if (!isset($this->conn)) {
        $this->_createNewConnectionAndChannel();
    }
    try {
        $this->ch->basic_publish(
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
            $this->defaults['exchange']['name'], 
            $this->defaults['binding']['routing_key']
        );
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->_createNewConnectionAndChannel();
        $this->publishMessage($message); // try again
    }
}

protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {
        $this->conn->close();
    }

    if(isset($this->ch)) {
        $this->ch->close();
    }

    $this->conn = new AMQPConnection(
        $this->defaults['connection']['host'], 
        $this->defaults['connection']['port'], 
        $this->defaults['connection']['user'], 
        $this->defaults['connection']['pass']
    );
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching

    $this->ch->queue_declare(
        $this->defaults['queue']['name'],
        $this->defaults['queue']['passive'],
        $this->defaults['queue']['durable'],
        $this->defaults['queue']['exclusive'],
        $this->defaults['queue']['auto_delete']
    );

    $this->ch->exchange_declare(
        $this->defaults['exchange']['name'],
        $this->defaults['exchange']['type'],
        $this->defaults['exchange']['passive'],
        $this->defaults['exchange']['durable'],
        $this->defaults['exchange']['auto_delete']
    );

    $this->ch->queue_bind(
        $this->defaults['queue']['name'],
        $this->defaults['exchange']['name'],
        $this->defaults['binding']['routing_key']
    );
}

Any help will be appreciated.

4

3 回答 3

10

确保您在 Rabbit MQ 上为您的用户添加了虚拟主机访问权限。我创建了新用户并忘记了默认使用的“/”主机的设置访问权限。

您可以通过管理面板 yourhost:15672 > Admin > 单击用户 > 查找“设置权限”来执行此操作。

PS 我假设您的 RabbitMQ 服务正在运行,用户存在且密码正确。

于 2014-10-30T09:23:02.060 回答
4

实际上,当您的消息中有大量内容并且您的消费者花费太多时间仅处理一条消息时,就会发生此问题,即向兔子响应“ACK”并尝试使用另一条消息的问题。

例如,当我遇到这个问题时,我会尝试“适应”我的消息,因为它是一个产品工作者,并且每条消息都有一些类似 1k 的产品 id,所以我改为 100 个产品并且效果很好。

您可以在此处阅读有关使用心跳检测死 TCP 连接的更多信息

于 2016-10-14T20:17:33.343 回答
1

这个问题发生在我与 RabbitMQ 的连接中断时(原因无关紧要,在我的情况下,我故意停止 RabbitMQ 服务以进行一些失败测试),并且我试图通过关闭旧连接并初始化再次重新连接到 RabbitMQ一个新的,但我收到Broken pipe or closed connection错误。我解决这个问题的方法是reconnect()在我的连接上使用方法:

$channel->reconnect();
于 2019-03-13T22:15:17.383 回答