0

我有以下功能,一次预取 3 条消息:

我想在一个数组中收集这 3 条消息并将其传递给 aaaaa.ccccc()

启动工人

function startWorker() {

    amqpConn.createChannel(function (err, ch) {
    
        if (closeOnErr(err)) return;
    
        ch.on("error", function (err) {
            console.error("[AMQP] channel error", err.message);
        });
    
        ch.on("close", function () {
            console.log("[AMQP] channel closed");
        });
    
        //Fetching 3 messages at a time
        ch.prefetch(3);
    
        ch.assertQueue("<queue-name>", { durable: true }, function (err, _ok) {
            if (closeOnErr(err)) return;
            ch.consume("<queue-name>", processMsg, { noAck: false });
            console.log("Worker is started");
        });
    
        function processMsg(msg) {
            work(msg, function (ok) {
                try {
                    if (ok)
                        ch.ack(msg);
                    else
                        ch.reject(msg, true);
                } catch (e) {
                    closeOnErr(e);
                }
            });
        }
    });
}

功函数

async function work(msg, cb) {

    let payload = JSON.parse(msg.content.toString());
    console.log("Got request.. ", payload);

    let aaaaa = bbbbb.find(payload.domain);
    let response;
    
    try {

        //Pass 3 messages as an array to aaaaa.ccccc()
        response = await (aaaaa.ccccc(payload.url, payload.listing));
        console.log("Got response.. ", response);

        // Publish call
        publish("", "publish-queue-name", Buffer.from(JSON.stringify({
            id: payload.id,
            domain: payload.domain,
            url: payload.url,
            data: response.data
        })));

        // ACK
        cb(true);
    } catch (error) {

        console.log(error);

        // Publish error
        publish("", "publish-queue-name-error", Buffer.from(JSON.stringify({
            id: payload.id,
            domain: payload.domain,
            url: payload.url,
            error: error
        })));

        // ACK
        cb(true);
    }
}
4

0 回答 0