我有以下功能,一次预取 3 条消息:
我想在一个数组中收集这 3 条消息并将其传递给 aaaaa.ccccc()
启动工人
function startWorker() {
amqpConn.createChannel(function (err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function (err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function () {
console.log("[AMQP] channel closed");
});
//Fetching 3 messages at a time
ch.prefetch(3);
ch.assertQueue("<queue-name>", { durable: true }, function (err, _ok) {
if (closeOnErr(err)) return;
ch.consume("<queue-name>", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
work(msg, function (ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
});
}
功函数
async function work(msg, cb) {
let payload = JSON.parse(msg.content.toString());
console.log("Got request.. ", payload);
let aaaaa = bbbbb.find(payload.domain);
let response;
try {
//Pass 3 messages as an array to aaaaa.ccccc()
response = await (aaaaa.ccccc(payload.url, payload.listing));
console.log("Got response.. ", response);
// Publish call
publish("", "publish-queue-name", Buffer.from(JSON.stringify({
id: payload.id,
domain: payload.domain,
url: payload.url,
data: response.data
})));
// ACK
cb(true);
} catch (error) {
console.log(error);
// Publish error
publish("", "publish-queue-name-error", Buffer.from(JSON.stringify({
id: payload.id,
domain: payload.domain,
url: payload.url,
error: error
})));
// ACK
cb(true);
}
}