I am processing a huge xml document (which contains around a million entries) and subsequently importing a formatted version to the db using rabbitmq. Each time after publishing around 200,000 entries I receive a broken pipe error , and rabbitmq is unable to recover from it.

Notice Error: fwrite(): send of 2651 bytes failed with errno=11 Resource temporarily unavailable in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 33 bytes failed with errno=104 Connection reset by peer in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

Notice Error: fwrite(): send of 19 bytes failed with errno=32 Broken pipe in [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc, line 439]

This subsequently causes a node down error and the process needs to be manually killed to recover from it.

These are my class methods:-

public function publishMessage($message) {
    if (!isset($this->conn)) {
    try {
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->publishMessage($message); // try again

protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {

    if(isset($this->ch)) {

    $this->conn = new AMQPConnection(
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching




Any help will be appreciated.


3 回答 3


确保您在 Rabbit MQ 上为您的用户添加了虚拟主机访问权限。我创建了新用户并忘记了默认使用的“/”主机的设置访问权限。

您可以通过管理面板 yourhost:15672 > Admin > 单击用户 > 查找“设置权限”来执行此操作。

PS 我假设您的 RabbitMQ 服务正在运行,用户存在且密码正确。

于 2014-10-30T09:23:02.060 回答


例如,当我遇到这个问题时,我会尝试“适应”我的消息,因为它是一个产品工作者,并且每条消息都有一些类似 1k 的产品 id,所以我改为 100 个产品并且效果很好。

您可以在此处阅读有关使用心跳检测死 TCP 连接的更多信息

于 2016-10-14T20:17:33.343 回答

这个问题发生在我与 RabbitMQ 的连接中断时(原因无关紧要,在我的情况下,我故意停止 RabbitMQ 服务以进行一些失败测试),并且我试图通过关闭旧连接并初始化再次重新连接到 RabbitMQ一个新的,但我收到Broken pipe or closed connection错误。我解决这个问题的方法是reconnect()在我的连接上使用方法:

于 2019-03-13T22:15:17.383 回答