0

我正在尝试使用 Bull 框架将 DTO 发送到我的 Redis 队列并在处理器中处理这些 DTO。有时作业会传递给处理器(100 个中的 1 个),但大部分时间都因错误job stalled more than allowable limit而失败:我不知道如何修复它。

我给你一个小介绍,下面你可以看到我的代码。我创建了queue-api用作队列包装器的模块,例如订单队列。然后我将此模块导入到我想将 DTO 发布到队列中的模块,在我的情况下order-module

queue-api 模块文件

// queue-api.module.ts
@Module({
  imports: [
    BullModule.registerQueue(
      {
        name: 'order-queue',
        defaultJobOptions: {
          backoff: 10000,
          attempts: Number.MAX_SAFE_INTEGER,
        },
      },
    ),
    ...
  ],
  providers: [OrderQueue],
  exports: [OrderQueue],
})
export class QueueApiModule {}

// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
  constructor(
    @InjectQueue('order-queue')
    private readonly queue: Queue,
  ) {}

  async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
    const job = await this.queue.add('send-submit-mail', dto)
    console.log(`Job ${job.id} created.`)
  }
}

订单模块文件

// order.module.ts
@Module({
  imports: [
    QueueApiModule,
    ...
  ],
  providers: [
    OrderProcessor,
    ...
  ]
})
export class OrderModule {}

// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
  constructor(private readonly queue: OrderQueue) {}

  @Process('send-submit-mail')
  async onProcessSubmitMail(job: Job): Promise<void> {
    console.log(`Processing of job ${job.id}`)
  }
}

这个处理器处理程序几乎从不被调用。

你知道我的代码有什么问题吗?谢谢你的建议。

4

1 回答 1

0

我遇到了类似的问题,但还没有深入寻找根本原因。但与此同时,我使用了 bull-repl (npm Bull-repl) cli 来查看队列状态。当发生停滞错误时,此后将不会触发任何作业(似乎队列卡在失败的作业上)。如果您在 Bull-repl 中运行 stats,您会看到有一个处于 Active 状态的作业。您可以手动删除它(使用 Bull-repl),然后您将运行下一个作业。我怀疑 QueueScheduler 没有运行,因此没有处理停滞的作业。您还可以增加停滞的超时参数(其中有 2-3 个,检查 [https://docs.bullmq.io/bull/important-notes]),看看它是否有帮助。就我而言,当我暂停调试时会发生锁定。

于 2021-12-26T14:26:10.803 回答