1
  • 我有一个使用 Bullmq 队列、调度程序和工作程序的快速应用程序。即使按下 Ctrl + CI 后,仍然可以看到节点进程在我的活动管理器中运行,但终端上的服务器关闭。我知道这一点,因为即使在服务器关闭到终端之后,bulmq 任务也开始输出 console.log 语句。

这就是我的 server.js 文件的样子

// eslint-disable-next-line import/first
import http from 'http';
import { app } from './app';
import { sessionParser } from './session';
import { websocketServer } from './ws';
import 'jobs/repeatable';

const server = http.createServer(app);

server.on('upgrade', (request, socket, head) => {
  sessionParser(request, {}, () => {
    websocketServer.handleUpgrade(request, socket, head, (ws) => {
      websocketServer.emit('connection', ws, request);
    });
  });
});

server.on('listening', () => {
  websocketServer.emit('listening');
});

server.on('close', () => {
  websocketServer.emit('close');
});

// https://stackoverflow.com/questions/18692536/node-js-server-close-event-doesnt-appear-to-fire
process.on('SIGINT', () => {
  server.close();
});

export { server };

请注意,我在上面定义了一个 SIGINT 处理程序。这是我的工作没有退出的原因吗?我是否必须手动关闭 SIGINT 中的每个队列、工作程序和调度程序?我的 jobs/repeatable.js 文件如下所示

const { scheduleJobs } = require('jobs');

if (process.env.ENABLE_JOB_QUEUE === 'true') {
  scheduleJobs();
}

这是我的jobs.js 文件

import { scheduleDeleteExpiredTokensJob } from './delete-expired-tokens';
import { scheduleDeleteNullVotesJob } from './delete-null-votes';

export async function scheduleJobs() {
  await scheduleDeleteExpiredTokensJob();
  await scheduleDeleteNullVotesJob();
}

这是我的 delete-expired-tokens.js 文件,另一个非常相似

import { processor as deleteExpiredTokensProcessor } from './processor';
import { queue as deleteExpiredTokensQueue } from './queue';
import { scheduler as deleteExpiredTokensScheduler } from './scheduler';
import { worker as deleteExpiredTokensWorker } from './worker';

export async function scheduleDeleteExpiredTokensJob() {
  const jobId = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_ID;
  const jobName = process.env.QUEUE_DELETE_EXPIRED_TOKENS_JOB_NAME;
  await deleteExpiredTokensQueue.add(jobName, null, {
    repeat: {
      cron: process.env.QUEUE_DELETE_EXPIRED_TOKENS_FREQUENCY,
      jobId,
    },
  });
}

export {
  deleteExpiredTokensProcessor,
  deleteExpiredTokensQueue,
  deleteExpiredTokensScheduler,
  deleteExpiredTokensWorker,
};

如何优雅地关闭 Bullmq 任务队列?

4

1 回答 1

1

你必须调用close()工人的方法:

server.on('close', async () => {
  websocketServer.emit('close');
  // Close the workers
  await worker.close()
});

文档

于 2022-02-18T08:11:53.130 回答