0

我有多个后台作业处理器使用如下的公牛包设置

import { CronJob } from 'cron';

import Queue from 'bull';

let queue = new Queue('workers', {
  //  settings: { lockDuration: 60 * 20000 },
  defaultJobOptions: {
    removeOnComplete: true
  },
});


queue
  .on('waiting', function(jobId) {
    // A Job is waiting to be processed as soon as a worker is idling.
    // workerLogger.info(`Job ${jobId} waiting to be processed `);
  })
  .on('completed', async(job, result) => {
    workerLogger.info(`Job ID: ${job.id}, Result: ${result}`);
    try {
      const jobbed = await queue.getJob(job.id);
      if (jobbed) {
        await jobbed.remove();
        workerLogger.info(`removed completed job ${job.id}`);
      }
    } catch (error) {
      throw new Error(error);
    }
  })
  .on('failed', function(job, err) {
    workerLogger.error('job ' + job.id + ' in queue failed... ' + err);
  })
  .on('error', function(err) {
    workerLogger.error('Queue Error... ' + err);
  })
  .on('stalled', function(job) {
    workerLogger.info(
      `stalled job, restarting it again! ${job.queue.name} ${JSON.stringify(
          job.data,
        )} ${job.id} ${job.name}`,
    );
  });

queue.process('healthCheckPing', concurrency, function(job, done) {
  jobs.healthCheckPing(job.data, done);
});

queue.process('test', concurrency, function(job, done) {
  jobs.test(job.data, done);
});

我尝试使用cron包基于 Crontime 运行作业,但只处理了一项作业,请查看下面的示例

  const cron = new CronJob({
    cronTime: '* * * * *',
    onTick: function() {
      (() => {
        workerLogger.info('Pushing test to queue...');

        queue.add('test');

        queue.add(
          'healthCheckPing',
          {
            jobName: 'test',
          },
        );
      })();
    },
    start: true,
    timeZone: 'Africa/Lagos',
  });

我已经尝试了很多方法让它工作,但似乎没有一个工作,将在下面列出一些

  • 运行await queue.obliterate({ force: true });
  • 在单独的实例上运行工作处理器
  • 为作业添加延迟 - queue.add('test',{}, {delay:500});
  • 为作业添加优先级 - queue.add('test',{}, {priority:1});

在上面的两个 Jobs(test, healthCheckPing) 中,只有一个总是被触发。

也就是说,要么名为test的作业得到处理,而另一个没有得到处理,或者名为healthCheckPing的作业得到处理,而另一个没有得到处理

这是下面的作业处理器功能

const jobs = {};

jobs.test = (_, done) => {
  try {
    workerLogger.error('test');

    done(false, 'ok');
  } catch (e) {
    done(e);
  }
};

jobs.healthCheckPing = async({
  jobName
}, done) => {
  try {
    workerLogger.info('health check pinger');

    if (!jobName) throw new Error('uuid not passed');

    // jobname is the slug for monitor

    // use pingkey to negate between staging and prod monitors
    const pingKey = !process.env.NODE_ENV || process.env.NODE_ENV !== 'production' ?
      process.env.STAGING_HEALTH_CHECK_KEY :
      process.env.PROD_HEALTH_CHECK_KEY;

    const url = `https://hc-ping.com/${pingKey}/${jobName}`;

    await axios.get(url);

    done(false, `pinged ${jobName}!`);
  } catch (error) {
    done(error);
  }
};

export default jobs;

此代码在我的本地计算机上完美运行,但此问题仅在生产时出现。节点服务器使用 pm2(cluster mode, instance = 1) 运行

4

1 回答 1

0

问题是因为我在另一个共享相同 Redis 连接的服务中使用了相同的队列名称,

let queue = new Queue('workers', {
  //  settings: { lockDuration: 60 * 20000 },
  defaultJobOptions: {
    removeOnComplete: true
  },
});

工人换成其他人解决了这个问题

于 2021-11-20T19:22:17.757 回答