我正在尝试使用 Bull 框架将 DTO 发送到我的 Redis 队列并在处理器中处理这些 DTO。有时作业会传递给处理器(100 个中的 1 个),但大部分时间都因错误job stalled more than allowable limit
而失败:我不知道如何修复它。
我给你一个小介绍,下面你可以看到我的代码。我创建了queue-api
用作队列包装器的模块,例如订单队列。然后我将此模块导入到我想将 DTO 发布到队列中的模块,在我的情况下order-module
。
queue-api 模块文件
// queue-api.module.ts
@Module({
imports: [
BullModule.registerQueue(
{
name: 'order-queue',
defaultJobOptions: {
backoff: 10000,
attempts: Number.MAX_SAFE_INTEGER,
},
},
),
...
],
providers: [OrderQueue],
exports: [OrderQueue],
})
export class QueueApiModule {}
// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
constructor(
@InjectQueue('order-queue')
private readonly queue: Queue,
) {}
async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
const job = await this.queue.add('send-submit-mail', dto)
console.log(`Job ${job.id} created.`)
}
}
订单模块文件
// order.module.ts
@Module({
imports: [
QueueApiModule,
...
],
providers: [
OrderProcessor,
...
]
})
export class OrderModule {}
// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
constructor(private readonly queue: OrderQueue) {}
@Process('send-submit-mail')
async onProcessSubmitMail(job: Job): Promise<void> {
console.log(`Processing of job ${job.id}`)
}
}
这个处理器处理程序几乎从不被调用。
你知道我的代码有什么问题吗?谢谢你的建议。