0

我有两种从 json/csv 导入产品和库存的方法。我已经实现了 NestJS Bull 模块来排队作业。两个导入过程都在异步运行并且工作正常。但是现在我想先完全处理产品导入队列,然后只处理库存导入队列。

工作模块.ts

BullModule.registerQueueAsync({
  name: 'default',
  inject: [ConfigService],
  useFactory: async (
    configService: ConfigService,
  ): Promise<BullModuleOptions> => ({
    redis: {
      ...
    },
    defaultJobOptions: {
      attempts: 1,
    },
  }),
})

工作服务.ts

constructor(@InjectQueue('default') private defaultQueue: Queue) { }

@Cron(CronExpression.EVERY_30_MINUTES)
async queueProductUpdateJob() {
    await this.defaultQueue.add('productUpdate');
}

@Cron(CronExpression.EVERY_30_MINUTES)
async queueInventoryUpdateJob() {
    await this.defaultQueue.add('inventoryUpdate');
}

作业处理器.ts

@Process('productUpdate')
async productUpdate(job: Job<unknown>){
  console.log('data: ', JSON.stringify(job.data));
  await this.updateProductService.importProducts(job.data);
  return {};
}

@Process('inventoryUpdate')
async inventoryUpdate(job: Job<unknown>){
  console.log('data: ', JSON.stringify(job.data));
  await this.updateInventoryService.importInventory(job.data);
  return {};
}

我们如何才能queueProductUpdateJob先完全处理作业,然后再处理queueInventoryUpdateJob

4

1 回答 1

-1

我想如果你像这样改变jobs.service.ts就可以了:(因为他们俩都是每30分钟玉米一次)

工作服务.ts

constructor(@InjectQueue('default') private defaultQueue: Queue) { }

@Cron(CronExpression.EVERY_30_MINUTES)
async queueProductInventoryUpdateJob() {
    await this.defaultQueue.add('productUpdate');
    await this.defaultQueue.add('inventoryUpdate');
}

或者您可以使用“import { queue } from 'async';”来做到这一点 (如果想告诉我更多解释)它会排成一个队列并一个接一个地做作业

于 2021-05-21T09:28:39.183 回答