我有 Nest.js 应用程序,它以一定的延迟将作业添加到 Bull 队列:
this.appQueue.add(
{
message: data,
},
{
delay: APP_DELAY,
},
);
现在这个应用程序没有任何消费者(即用@Processor()
装饰器装饰的类和用方法装饰的方法。相反,只有当队列中有等待作业时才会触发@Process()
另一个应用程序(实际上是用创建的同一部分应用程序的实例)NestFactory.createApplicationContext(AppModule)
它的任务是处理这些工作:
async function bootstrap(): Promise<void> {
const app = await NestFactory.createApplicationContext(AppModule);
const appConsumer = app.get(AppConsumer);
appConsumer.once(AppConsumerEvents.WorkCompleted, async () => {
process.exit(0);
await app.close();
});
}
bootstrap();
// file with consumer class
@Processor(APP_QUEUE_NAME)
export class AppConsumer extends EventEmitter {
public constructor(
@InjectQueue(APP_QUEUE_NAME)
private readonly appQueue: Queue<IAppQueueData>,
private readonly appService: AppService,
) {
super();
this.init();
}
private async init(): Promise<void> {
const waitingCount = await this.appQueue.getWaitingCount();
const delayed = await this.appQueue.getDelayedCount();
console.log(waitingCount, delayed);
if (waitingCount === 0) {
this.emit(AppConsumerEvents.WorkCompleted);
}
}
@Process()
public processJobs(job: Job<IAppQueueData>): Promise<void> {
const { namespace, token, message } = job.data;
console.log(`Job ${job.id} processed with worker ${ID}`);
return this.appService.publishMessage(message, token, namespace);
}
@OnGlobalQueueDrained()
public onGlobalQueueDrained(): void {
this.emit(AppConsumerEvents.WorkCompleted);
}
}
现在的问题是:因为主应用程序没有任何消费者正在运行,所以添加了一些延迟的作业会在队列中保持延迟,直到有一些消费者正在运行,它将作业从延迟提升到在指定时间过去后等待(这不是' t 在文档中的任何地方都明确说明,但它看起来像那样工作)。如果我在主应用程序中定义使用者,那么它会将作业从延迟提升到等待,但同时它正在处理它们,因此它们最终在队列中处于“完成”状态。我做了一个临时解决方案,在主应用程序中我创建了一个 cron 作业,它在指定的时间间隔内获取所有延迟的作业,检查作业是否可以提升,并提升它们。
@Cron(CronExpression.EVERY_30_SECONDS)
public async checkIfJobsShouldBePromoted(): Promise<void> {
const delayedJobs = await this.appQueue.getDelayed();
if (delayedJobs.length) {
for (const job of delayedJobs) {
if (Date.now() > job.timestamp + APP_QUEUE_DELAY) {
console.log(`job ${job.id} is ready to be promoted!`);
job.promote();
} else {
console.log(`job ${job.id} not ready to be promoted yet :(`);
}
}
}
}
它工作正常 - 但我想要另一种解决方案,无需手动操作。理想情况下,我想定义一个消费者,它的唯一责任是将工作提升到等待状态但不处理它们。阅读 Nest.js 和 Bull 文档我找不到这样的选项。这可以实现吗?
提前致谢。