2

What I am trying to do

I try to create rabbit-mq publisher & subscriber. It works as expected until I try to restart my rabbit-mq server.

What works

I use rabbitmq:3-management docker image, ampqlib 5.3, and Node.js 11.10.0 to make this simple program:

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

function createChannel() {
    return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
        .then((conn) => conn.createChannel());
}

Promise.all([createChannel(), createChannel()])

    .then(async (channels) => {
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        await publisherChannel.assertQueue(q).then(function(ok) {
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        await consumerChannel.assertQueue(q).then(function(ok) {
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

So, first of all, I made two channels. One as publisher, and the other as consumer.

The publisher emit something to do message to tasks queue.

The consumer then catch the message and print it to the screen using console.log.

It works as expected.

What doesn't work

First Attempt

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

function createChannel() {
    return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
        .then((conn) => conn.createChannel());
}

Promise.all([createChannel(), createChannel()])

    .then((channels) => {

        // Let's say rabbitmq is down, and then up again
        execSync("docker stop rabbitmq");
        execSync("docker start rabbitmq");

        return channels;
    })

    .then(async (channels) => {
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        await publisherChannel.assertQueue(q).then(function(ok) {
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        await consumerChannel.assertQueue(q).then(function(ok) {
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

Similar to my previous attempt, but this time I try to stop and start rabbit-mq container (restarting the server) before proceed.

It doesn't work, I get this error instead:

{ Error: Socket closed abruptly during opening handshake
    at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                           
    at Socket.emit (events.js:202:15)
    at endReadableNT (_stream_readable.js:1129:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Socket closed abruptly during opening handshake
       at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                        
       at Socket.emit (events.js:202:15)
       at endReadableNT (_stream_readable.js:1129:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }
[guldan@draenor labs]$ node --version
v11.10.0
[guldan@draenor labs]$ docker start rabbitmq && node test.js
rabbitmq
{ Error: Channel ended, no reply will be forthcoming
    at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)                                                                                  
    at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)                                                            
    at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)                                                                   
    at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)                                                      
    at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)                                                             
    at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)                                                                     
    at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)                                                 
    at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)                                                                        
    at Socket.emit (events.js:197:13)
    at emitReadable_ (_stream_readable.js:539:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Channel ended, no reply will be forthcoming
       at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)                                                                               
       at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)                                                         
       at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)                                                                
       at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)                                                   
       at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)                                                          
       at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)                                                                  
       at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)                                              
       at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)                                                                     
       at Socket.emit (events.js:197:13)
       at emitReadable_ (_stream_readable.js:539:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }

Second attempt

My first attempt didn't work. So, I try to create new channel after restarting the server:

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

function createChannel() {
    return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
        .then((conn) => conn.createChannel());
}

Promise.all([createChannel(), createChannel()])

    .then((channels) => {

        // Let's say rabbitmq is down, and then up again
        execSync("docker stop rabbitmq");
        execSync("docker start rabbitmq");

        return Promise.all([createChannel(), createChannel()]);
        // return channels;
    })

    .then(async (channels) => {
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        await publisherChannel.assertQueue(q).then(function(ok) {
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        await consumerChannel.assertQueue(q).then(function(ok) {
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

And this time, I got this error instead:

{ Error: Socket closed abruptly during opening handshake
    at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                           
    at Socket.emit (events.js:202:15)
    at endReadableNT (_stream_readable.js:1129:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Socket closed abruptly during opening handshake
       at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)                                                        
       at Socket.emit (events.js:202:15)
       at endReadableNT (_stream_readable.js:1129:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }

I'm not really sure, but I think the error is related to It might be related to https://github.com/squaremo/amqp.node/issues/101.

What I want

I want workaround/solution to reconnect to rabbitmq after the server restarted. Any explanation/suggestion is also welcomed.

Edit

I try to go deeper and modify my code a bit:

const q = 'tasks';

const { execSync } = require("child_process");
const amqplib = require("amqplib");

async function createConnection() {
    console.log("connect");
    const conn = amqplib.connect("amqp://root:toor@0.0.0.0:5672/");
    console.log("connected");
    return conn;
}

async function createChannel(conn) {
    console.log("create channel");
    const channel = conn.createChannel({durable: false});
    console.log("channel created");
    return channel;
}

async function createConnectionAndChannel() {
    const conn = await createConnection();
    const channel = await createChannel(conn);
    return channel;
}

Promise.all([createConnectionAndChannel(), createConnectionAndChannel()])

    .then((channels) => {

        // Let's say rabbitmq is down, and then up again
        console.log("restart server");
        execSync("docker stop rabbitmq");
        execSync("docker start rabbitmq");
        console.log("server restarted");

        return Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
        // return channels;
    })

    .then(async (channels) => {
        console.log("channels created");
        const [publisherChannel, consumerChannel] = channels;

        // publisher
        console.log("publish");
        await publisherChannel.assertQueue(q).then(function(ok) {
            console.log("published");
            return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
        });

        // consumer
        console.log("consume");
        await consumerChannel.assertQueue(q).then(function(ok) {
            console.log("consumed");
            return consumerChannel.consume(q, function(msg) {
                if (msg !== null) {
                    console.log(msg.content.toString());
                    consumerChannel.ack(msg);
                }
            });
        });

    })

    .catch(console.warn);

And I get this output:

connect
connected
connect
connected
create channel
channel created
create channel
channel created
restart server
server restarted
connect
connected
connect
connected
{ Error: Socket closed abruptly during opening handshake
    at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)                                                                                             
    at Socket.emit (events.js:202:15)
    at endReadableNT (_stream_readable.js:1129:12)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  cause:
   Error: Socket closed abruptly during opening handshake
       at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)                                                                                          
       at Socket.emit (events.js:202:15)
       at endReadableNT (_stream_readable.js:1129:12)
       at processTicksAndRejections (internal/process/next_tick.js:76:17),
  isOperational: true }

So I guess amqplib is able to re-connect but fail to create channel.

4

3 回答 3

2

最后我设法找到了答案:

const { execSync } = require("child_process");
const amqp = require("amqplib");

async function sleep(delay) {
    return new Promise((resolve, reject) => {
        setTimeout(resolve, delay);
    });
}

async function createChannel(config) {
    const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config);
    try {
        // create connection
        const connection = await amqp.connect(url);
        let channel = null;
        connection._channels = [];
        connection.on("error", (error) => {
            console.error("Connection error : ", config, error);
        });
        connection.on("close", async (error) => {
            if (channel) {
                channel.close();
            }
            console.error("Connection close : ", config, error);
            await sleep(1000);
            createChannel(config);
        });
        // create channel
        channel = await connection.createConfirmChannel();
        channel.on("error", (error) => {
            console.error("Channel error : ", config, error);
        });
        channel.on("close", (error) => {
            console.error("Channel close : ", config, error);
        });
        // register listeners
        for (queue in listeners) {
            const callback = listeners[queue];
            channel.assertQueue(queue, { durable: false });
            channel.consume(queue, callback);
        }
        // publish
        for (queue in publishers) {
            const message = publishers[queue];
            channel.assertQueue(queue, { durable: false });
            channel.sendToQueue(queue, message);
        }
        return channel;
    } catch (error) {
        console.error("Create connection error : ", error);
        await sleep(1000);
        createChannel(config);
    }
}

async function main() {
    // publish "hello" message to queue
    const channelPublish = await createChannel({
        url: "amqp://root:toor@0.0.0.0:5672",
        publishers: {
            "queue": Buffer.from("hello"),
        }
    });

    // restart rabbitmq
    execSync("docker stop rabbitmq");
    execSync("docker start rabbitmq");

    // consume message from queue
    const channelConsume = await createChannel({
        url: "amqp://root:toor@0.0.0.0:5672",
        listeners: {
            "queue": (message) => {
                console.log("Receive message ", message.content.toString());
            },
        }
    });

    return true;
}

main().catch((error) => console.error(error));

基本上,我将通道附加到连接中。所以每当连接产生错误(例如:rabbitmq 服务器关闭)时,程序都会等待一秒钟,并尝试创建一个新连接。

缺点是,我将失去对旧连接及其通道的引用。

为了克服这个麻烦,我将队列、发布者和消费者信息存储在其他地方(在这种情况下,我将其作为createChannel's 参数)。

最后,每次重新连接时,我也会制作频道以及构建每个发布者和消费者。

不太方便,但至少它按预期工作。

于 2019-02-22T10:39:15.010 回答
2

我遇到了这个问题,因为我传递了 URL“amqp://guest:guest@localhost:15672/”但是现在我在连接 URL 中传递了 URL“amqp://localhost”。传递后我的问题得到了解决。

于 2021-10-19T08:27:48.860 回答
0

丢失引用在这里似乎无关紧要,rabbitmq 仅在通道不存在时创建通道,如果通道存在,则新变量 refrerence 将指向同一通道。因此,不尝试存储引用可能会使应用程序更轻量级。

于 2021-09-30T14:39:23.883 回答