0

我正在尝试使用nest.js 中的BullQueue 包来实现一个队列。我有两个队列,一个是文件队列,另一个是电子邮件队列。

上传缩略图后,我使用文件队列在后台重新生成缩略图。和电子邮件队列发送电子邮件。问题是我的文件队列工作得很好,但是我创建的电子邮件队列给了我一个不一致的结果。有时工作会被该@Process('JobName')功能接手,有时则不会。

更重要的是,我注意到当问题发生时,我尝试删除 dist 文件夹并重新启动重新生成 dist 目录的服务器。它工作,直到它没有经过几次试验。

这是我的文件夹结构。

my-nest-app/
├─ src/
│  ├─ bull-queue/
│  │  ├─ bull-queue.module.ts
│  │  ├─ file-queue/
│  │  │  ├─ file-queue-producer.service.ts
│  │  │  ├─ file-queue-consumer.service.ts
│  │  ├─ email-queue/
│  │  │  ├─ email-producer.service.ts
│  │  │  ├─ email-processor.service.ts

公牛队列.module.ts

export enum BullQueue {
    FileQueue = 'file-queue',
    EmailQueue = 'email-queue',
}

@Global()
@Module({
    imports: [
        BullModule.forRoot({
            redis: {
                host: process.env.REDIS_HOST,
                port: parseInt(process.env.REDIS_PORT) || 6379
            }
        }),
        BullModule.registerQueue(
            { name: BullQueue.FileQueue },
            { name: BullQueue.EmailQueue }
        ),
        ProjectModule,
        FileModule
    ],
    providers: [
        BullQueueService,
        FileQueueConsumerService,
        FileQueueProducerService,
        EmailProcessorService,
        EmailProducerService
    ],
    controllers: [],
    exports: [FileQueueProducerService, EmailProducerService]
})
export class BullQueueModule {}

电子邮件-producer.service.ts

export enum EmailJobs {
    PasswordReset = 'password_reset',
}

@Injectable()
export class EmailProducerService {
    constructor(@InjectQueue(BullQueue.EmailQueue) private queue:Queue){}

    async password_reset_email(payload: PasswordResetPayload){
        await this.queue.add(EmailJobs.PasswordReset, payload);
    }

}

电子邮件处理器.service.ts

@Processor(BullQueue.EmailQueue)
export class EmailProcessorService {

    constructor(
        private readonly loggerService: LoggerService,
        private readonly helperService: HelperService
    ) {}



    @Process(EmailJobs.PasswordReset)
    async password_reset(job: Job<PasswordResetPayload>) {
        console.log('password_reset Job Processor')
    }
}

我想知道我是否违反了任何设计原则,或者这只是队列包管理器的问题。

4

0 回答 0