3

我想使用 node.js Bull处理预定的作业。基本上我有两个处理器来处理两种类型的工作。有一个配置器可以配置将使用 cron 添加到公牛队列的作业。

调度程序将位于一个微服务中,每个处理器将是一个单独的微服务。所以我将拥有 3 个微服务。

我的问题是我在公牛上使用正确的模式吗?

index.js

const Queue = require('bull');

const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});

处理器配置器.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");

fetcher-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");

fetcher.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

警报处理器.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

将有三个微服务 -

  1. 节点索引.js
  2. 节点 fetcher-configurator.js
  3. 节点处理器配置器.js

我看到公牛的行为不一致。有时我收到错误缺少作业类型的流程处理程序

4

2 回答 2

6

引用自己希望这对其他人有帮助

这是因为两个工作人员使用相同的队列。Worker 尝试从队列中获取下一个作业,接收到错误类型的作业(例如,“fetcher”而不是“processor”)并失败,因为它知道如何处理“processor”并且不知道如何处理“fetcher”。Bull 不允许您只从队列中获取兼容的工作,两个工作人员都应该能够处理所有类型的工作。最简单的解决方案是使用两个不同的队列,一个用于处理器,一个用于提取器。然后您可以从作业和处理器中删除名称,因为名称由队列定义,因此不再需要它。

https://github.com/OptimalBits/bull/issues/1481

于 2019-09-23T19:32:25.937 回答
0

公牛:

过期队列.js

import Queue from 'bull';
import { ExpirationCompletePublisher } from '../events/publishers/expiration-complete-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  orderId: string;
}

const expirationQueue = new Queue<Payload>('order:expiration', {
  redis: {
    host: process.env.REDIS_HOST, 
  },
});

expirationQueue.process(async (job) => {
  console.log('Expiries order id', job.data.orderId);
  new ExpirationCompletePublisher(natsWrapper.client).publish({
    orderId: job.data.orderId,
  });
});

export { expirationQueue };

促销EndQueue.js

import Queue from 'bull';
import { PromotionEndedPublisher } from '../events/publishers/promotion-ended-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  promotionId: string;
}

const promotionEndQueue = new Queue<Payload>('promotions:end', {
  redis: {
    host: process.env.REDIS_HOST, // look at expiration-depl.yaml
  },
});

promotionEndQueue.process(async (job) => {
  console.log('Expiries promotion id', job.data.promotionId);
  new PromotionEndedPublisher(natsWrapper.client).publish({
    promotionId: job.data.promotionId,
  });
});

export { promotionEndQueue };

order-created-listener.js

import { Listener, OrderCreatedEvent, Subjects } from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { expirationQueue } from '../../queues/expiration-queue';
export class OrderCreatedListener extends Listener<OrderCreatedEvent> {
  subject: Subjects.OrderCreated = Subjects.OrderCreated;
  queueGroupName = queueGroupName;

  async onMessage(data: OrderCreatedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.expiresAt).getTime() - new Date().getTime();
    // console.log("delay", delay)
    await expirationQueue.add(
      {
        orderId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}

促销开始-listener.js

import {
  Listener,
  PromotionStartedEvent,
  Subjects,
} from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { promotionEndQueue } from '../../queues/promotions-end-queue';
export class PromotionStartedListener extends Listener<PromotionStartedEvent> {
  subject: Subjects.PromotionStarted = Subjects.PromotionStarted;
  queueGroupName = queueGroupName;

  async onMessage(data: PromotionStartedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.endTime).getTime() - new Date().getTime();

    // console.log("delay", delay)
    await promotionEndQueue.add(
      {
        promotionId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}
于 2021-07-08T20:57:35.427 回答