4

我有 65 个同名作业要在一个队列中执行,其中包含 3 个消费者实例(A、B、C)。我想一次让每个消费者执行 10 个工作。10 次执行完成后,如果队列中有大于 10 的可用作业,则消费者再次执行 10 次作业。如果不执行可用作业。

工作 1 至 65

消费者 A 执行 1 到 10 消费者 B 执行 11 到 20 消费者 C 执行 21 到 30

让B,A,C按顺序完成执行。然后

B - 31,32,33,.40 A - 41,42,43,.50 C - 51,52,53,.60

如果 C 先完成执行,则 C 执行剩余的 5 个作业。请我知道有什么方法可以做到这一点。

制片人

@Injectable()
export class SampleQueueProducerService {
  constructor(@InjectQueue('sample-queue') private sampleQueue: Queue) {}

  async sendDataToJob(message: string) {
    await this.sampleQueue.add('job', { message });
  }
}

消费者

@Processor('sample-queue')
export class SampleQueueConsumerService {
  @Process({ name: 'job' })
  async sampleJob(job: Job<any>) {
    console.log(job.data);
  }
}

所有 3 个消费者都与上述相同。

4

1 回答 1

2

您可以使用此功能获得 x 个工作(公牛参考

但是,NestJS 不完全支持此功能

您可以做的是使用@OnGlobalQueueWaiting()@OnQueueWaiting()获取等待的作业,将它们存储在一个数组中,一旦达到 10,您就可以将它们发送到处理。

就像是:

@OnGlobalQueueWaiting()
async onGlobalWaiting(jobId: number, result: any) {
  const job = await this.myQueue.getJob(jobId);
  this.jobArray.push(job)
  if (this.jobArray.length >= 10) {
       await this.processJobs(this.jobArray);
       this.jobArray = [];
   }
}

async processJobs(jobs: Job[]){'
   jobs.forEach(job => do something)
}
 
 
于 2021-07-21T14:25:57.730 回答