0

我有 Nest.js 应用程序,它以一定的延迟将作业添加到 Bull 队列:

this.appQueue.add(
  {
    message: data,
  },
  {
    delay: APP_DELAY,
  },
);

现在这个应用程序没有任何消费者(即用@Processor()装饰器装饰的类和用方法装饰的方法。相反,只有当队列中有等待作业时才会触发@Process()另一个应用程序(实际上是用创建的同一部分应用程序的实例)NestFactory.createApplicationContext(AppModule)它的任务是处理这些工作:

async function bootstrap(): Promise<void> {
  const app = await NestFactory.createApplicationContext(AppModule);
  const appConsumer = app.get(AppConsumer);

  appConsumer.once(AppConsumerEvents.WorkCompleted, async () => {
    process.exit(0);

    await app.close();
  });
}

bootstrap();
// file with consumer class
@Processor(APP_QUEUE_NAME)
export class AppConsumer extends EventEmitter {
  public constructor(
    @InjectQueue(APP_QUEUE_NAME)
    private readonly appQueue: Queue<IAppQueueData>,
    private readonly appService: AppService,
  ) {
    super();

    this.init();
  }

  private async init(): Promise<void> {
    const waitingCount = await this.appQueue.getWaitingCount();
    const delayed = await this.appQueue.getDelayedCount();

    console.log(waitingCount, delayed);

    if (waitingCount === 0) {
      this.emit(AppConsumerEvents.WorkCompleted);
    }
  }

  @Process()
  public processJobs(job: Job<IAppQueueData>): Promise<void> {
    const { namespace, token, message } = job.data;
    console.log(`Job ${job.id} processed with worker ${ID}`);

    return this.appService.publishMessage(message, token, namespace);
  }

  @OnGlobalQueueDrained()
  public onGlobalQueueDrained(): void {
    this.emit(AppConsumerEvents.WorkCompleted);
  }
}

现在的问题是:因为主应用程序没有任何消费者正在运行,所以添加了一些延迟的作业会在队列中保持延迟,直到有一些消费者正在运行,它将作业从延迟提升到在指定时间过去后等待(这不是' t 在文档中的任何地方都明确说明,但它看起来像那样工作)。如果我在主应用程序中定义使用者,那么它会将作业从延迟提升到等待,但同时它正在处理它们,因此它们最终在队列中处于“完成”状态。我做了一个临时解决方案,在主应用程序中我创建了一个 cron 作业,它在指定的时间间隔内获取所有延迟的作业,检查作业是否可以提升,并提升它们。

@Cron(CronExpression.EVERY_30_SECONDS)
  public async checkIfJobsShouldBePromoted(): Promise<void> {
    const delayedJobs = await this.appQueue.getDelayed();

    if (delayedJobs.length) {
      for (const job of delayedJobs) {
        if (Date.now() > job.timestamp + APP_QUEUE_DELAY) {
          console.log(`job ${job.id} is ready to be promoted!`);
          job.promote();
        } else {
          console.log(`job ${job.id} not ready to be promoted yet :(`);
        }
      }
    }
  }

它工作正常 - 但我想要另一种解决方案,无需手动操作。理想情况下,我想定义一个消费者,它的唯一责任是将工作提升到等待状态但不处理它们。阅读 Nest.js 和 Bull 文档我找不到这样的选项。这可以实现吗?

提前致谢。

4

0 回答 0