I have set up kue to run with the Cluster module, which spawns a child process of kue
for every CPU core available..
When the scheduler inserts the every
keys to redis, everything seems to be in order - only a single keyset for every every
run.
However, when the time to trigger the job processing comes, all child processes (workers) start handling the processing logic, which results in the job having multiple instances triggered from a single "scheduler" entry.
This symptom does not seem to happen when programmatically triggering a new job in kue
, nor does it happen when using the kue
API to do so.
Please advise.
Main Bootstrap Code
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
require('./init.js');
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online');
});
cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
cluster.fork();
});
require('./scheduler.js');
} else {
require("./job_types");
}
init.js
/// Module Dependency
var cors = require('cors');
var kue = require('kue-scheduler');
var express = require('express');
var bodyParser = require('body-parser');
var config = require('./configs/config');
var redis = require("redis");
var client = redis.createClient();
var jobs = kue.createQueue();
require('./routes');
// Clearing redis for clean startup
console.log('Clearing old Redis data...');
client.flushall();
/// Webserver
var corsOptions = {origin: '*'};
var app = express();
app.use(cors(corsOptions));
app.options('*', cors(corsOptions));
app.use(bodyParser.json());
app.use(kue.app);
app.listen(config.env.port, function () {
var host = config.env.host;
var port = config.env.port;
console.log('[' + process.pid + '] Monitoring kue listening at http://%s:%s', host, port);
});
// Handling safe shutdown
process.once('SIGTERM', function (sig) {
kue.shutdown(5000, function (err) {
console.log('[' + process.pid + '] Kue shutdown: ', err || '');
process.exit(0);
});
});
process.on('uncaughtException', function (err) {
console.log('[' + process.pid + '] ' + err);
console.log('[' + process.pid + '] ' + err.stack);
});
scheduler.js
var scheduler = require('kue-scheduler');
var q = scheduler.createQueue();
// Set specific job scheduling here
q.every('1 minutes', q.createJob('getSocialEntities').attempts(3).priority('normal'));
// General scheduler event handling
// Uncomment for debug
q.on('already scheduled', function (job) {
console.log('[' + process.pid + '] job is already scheduled: ' + job.type + ' (' + job.id + ')');
});
q.on('schedule success', function (job) {
console.log('[' + process.pid + '] job scheduled: ' + job.type + ' (' + job.id + ')');
});
q.on('schedule error', function (error) {
console.error('[' + process.pid + '] failed scheduling job');
console.error(error);
});
- job_types - contains the kue
jobs.process('job type',...)
methods to handle all the job logic itself.