0

我有这个工人文件:

const { orm } = require('../../models');
const config = require('../../config');
const rabbitServices = require('../rabbitMQ');
const router = require('./listenerRouter');

(async () => {
  // connect to mongoose db (work fine)
  await orm.connect(config.db);
  if (config.rabbitMQ.enabled) {
    await rabbitServices.connect(config.rabbitMQ);
    await rabbitServices.createListener(config.dataSync.queue, router);
  }
  console.log('[DATASYNC] Worker started');
})();

我有兔子服务(问题在 addListener 函数中):

const rabbit = require('amqplib');
const SomeModel = require('../../models/SomeModel');

let rabbitConnection;
let channel;
const listeners = [];

const _createChannel = async () => {
  let newChannel;
  try {
    newChannel = await rabbitConnection.createChannel();
    channel = newChannel;
  } catch (error) {
    console.error('[AMQP]channel', error.message);
    return;
  }

  newChannel.on('close', function () {
    console.error('[AMQP] close chanel');
    channel = null;
  });

  console.log('[AMQP] create chanel');
};

const addListener = async (queue, callback, options = {}) => {
  const { ack } = options;
  const removeMessage = (msg) => () => channel.ack(msg);

  if (!channel) {
    console.error('[AMQP] chanel is closed, Listener no created');
    return;
  }
  await channel.assertQueue(queue, {
    durable: false
  });
  
  // This db-query work fine
  const seccessResult = await SomeModel.find({});
  return await channel.consume(queue, async (msg) => {
    if (msg !== null) {
      // But this db-query just aborting whitout any message
      const neverResult = await SomeModel.find({});

      callback(msg, {removeMessage: removeMessage(msg)});
      if (ack) { channel.ack(msg); }
    }
  });
};

const connect = async (options) => {
  const {login, password, host, reconnect_time} = options;
  const URL = `amqp://${login}:${password}@${host}`;
  let conn = null;

  try {
    conn = await rabbit.connect(URL);
  } catch (error) {
    console.error('[AMQP]connect', error.message);
    return setTimeout(connect, reconnect_time, options);
  }

  conn.on('error', function (err) {
    if (err.message !== 'Connection closing') {
      console.error('[AMQP] conn error', err.message);
    }
  });
  conn.on('close', function () {
    console.error('[AMQP] reconnecting');
    rabbitConnection = null;
    return setTimeout(connect, reconnect_time, options);
  });

  console.log(`RabbitMQ connection established on ${host}`);
  rabbitConnection = conn;
  await _createChannel();

  if (listeners.length) {
    await Promise.all(listeners.map(listener => addListener(
      listener.queue, listener.callback, listener.options
    )));
  }
};

const sendMessage = async (queue, message) => {
  if (!channel) {
    console.error('[AMQP] chanel is closed, message not sended');
    return;
  }
  await channel.assertQueue(queue);
  await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
    persistent: true
  });
  console.log(`RabbitMQ send message to queue ${message}`);
};

const createListener = async (queue, callback, options = {}) => {
  listeners.push({queue, callback, options});
  return await addListener(queue, callback, options);
};

module.exports = {
  connect,
  sendMessage,
  createListener
};

我只是不明白为什么 orm 在这个消费回调中不起作用。我花了一整天的时间寻找解决问题的方法,但仍然找不到。如何在侦听器内执行数据库查询?

4

0 回答 0