我在 nodeJS 中以多工作模式使用 puppeteer-cluster,由于某种原因,只有一个工作人员正在打开我定义的并发浏览器的数量。其他的被忽略。我究竟做错了什么?
基本上我为每个工作人员启动了一个具有 2 个浏览器并发的集群,所以我希望它打开工作人员的数量(我定义的 cpu 数量)*每个工作人员 2 个浏览器,但实际上只打开了两个浏览器。
例如:假设我有 8 个内核,所以我要启动 8 个 worker,每个 worker 将启动两个 puppeteer 浏览器,总共 16 个。而不是在 headless: true 模式下调试它时,我看到只打开了两个浏览器。
部门:“puppeteer”:“^5.2.1”,“puppeteer-cluster”:“^0.22.0”
import {Cluster} from 'puppeteer-cluster';
import {ReportTimeouts} from "../../config/reports.consts";
import {isDebug} from "../../utils/env.utils";
import {IPuppeteerClusterService} from "../../interfaces/services.interfaces";
export default class PuppeteerClusterService implements IPuppeteerClusterService {
private cluster;
public getCluster() {
if (!this.cluster) {
throw new Error(`PuppeteerClusterService.getCluster: init didn't run`);
}
return this.cluster;
}
public async init() {
const args = [
'--no-first-run',
'--no-zygote',
'--no-sandbox',
'--disable-extensions',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--ignore-certificate-errors',
"--proxy-server='direct://'",
'--proxy-bypass-list=*',
'--lang=en-US,en'];
const debugMode = isDebug;
const headless = !debugMode;
const concurrency = Cluster.CONCURRENCY_BROWSER;
const maxConcurrency = 2;
const cluster = await Cluster.launch({
concurrency,
maxConcurrency,
puppeteerOptions: {
headless,
dumpio: debugMode,
handleSIGTERM: true,
handleSIGINT: true,
args
},
monitor: false, // turn this on to get cpu / memory usages
timeout: ReportTimeouts.PuppeteerClusterTimeout,
});
console.log(`PuppeteerClusterService.init: initialized puppeteer cluster with concurrency type ${concurrency} and max concurrency of ${maxConcurrency}`);
console.log('PuppeteerClusterService.init: running headless?: ', headless);
this.cluster = cluster;
}
}
import './src/services/monitoring/tracer';
import {config} from 'dotenv';
import * as process from 'process';
import * as http from 'http';
import * as cluster from 'cluster';
import * as os from 'os';
import App from './app';
import initORM from './src/config/sequelize_config.handler';
import routes from './src/routes';
import {DEFAULT_PORTS, REQUEST_TIMEOUT_MINUTES} from './src/config/networking.consts';
import {IServices} from "./src/interfaces/services.interfaces";
import SystemSetting from "./src/models/system_setting.model";
import Services from "./src/services";
import {ISystemSetting} from "./src/interfaces/models/system_setting.interface";
import ActiveReportSendingScheduler from "./src/logic/scheduled_tasks/active_report_sending.scheduler";
import appConfig from './src/config';
import LoggerService from './src/services/logger.service';
import DayTaggingReportSendingScheduler from './src/logic/scheduled_tasks/day_tagging_report_sending.scheduler';
import { MessageConsumingManager } from './src/logic/messaging/message_consuming.manager';
import { isDebug } from './src/utils/env.utils';
config()
const env = process.env.NODE_ENV?.toLocaleLowerCase() || 'dev';
monitorServer(env, process);
const ports = {
http: process.env.HTTP_PORT ? Number(process.env.HTTP_PORT) : DEFAULT_PORTS.http,
};
const workers = [];
function setupOrm() {
const logger = LoggerService;
console.info(`setupOrm: app initiating on env ${env}`);
initORM({logger});
console.info(`setupOrm: sequelize ORM initiated`);
console.info('setupOrm: loading static tables into memory');
console.info(`setupOrm: env params are ${JSON.stringify(process.env)}`);
}
const setupWorkerProcesses = (services:IServices) => {
const numCores = os.cpus().length;
services.logger.info('setupWorkerProcesses: master cluster setting up ' + numCores + ' workers');
// iterate on number of cores need to be utilized by an application
// current example will utilize all of them
for(let i = 0; i < numCores; i++) {
// creating workers and pushing reference in an array
// these references can be used to receive messages from workers
workers.push(cluster.fork());
// to receive messages from worker process
workers[i].on('message', function(message) {
services.logger.info(message);
});
}
// process is clustered on a core and process id is assigned
cluster.on('online', function(worker) {
services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' is listening');
});
// if any of the worker process dies then start a new one by simply forking another one
cluster.on('exit', function(worker, code, signal) {
services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
services.logger.info('setupWorkerProcesses: starting a new worker');
cluster.fork();
workers.push(cluster.fork());
// to receive messages from worker process
workers[workers.length-1].on('message', function(message) {
services.logger.info(message);
});
});
};
async function setApp(services:IServices, systemSettings: ISystemSetting[]) {
const app = await App.init({routes, services, systemSettings, env});
services.logger.info(`setApp: app initiated on env ${env}`);
services.logger.info(`setApp: app initiated with config ${JSON.stringify(appConfig)}`);
services.logger.info(`setApp: routes: ${Object.keys(routes).join(' | ')}`);
services.logger.info(`setApp: ports: ${JSON.stringify(ports)}`);
services.logger.info(`setApp: database connection gained`);
const server = http.createServer(app)
.listen(
ports.http,
async () => {
services.logger.info(`setApp: HTTP Server successfully started at port ${ports.http}`);
}
);
server.keepAliveTimeout = REQUEST_TIMEOUT_MINUTES * 60 * 1000; // Time (in ms) server will wait and keep the connection open after last response.
server.headersTimeout = (REQUEST_TIMEOUT_MINUTES * 60 * 1000) + 1000; // https://github.com/nodejs/node/issues/27363#issuecomment-603489130
}
function setSchedulers(services:IServices, systemSettings: ISystemSetting[]) {
const schedulers = [
new ActiveReportSendingScheduler(services,systemSettings),
new DayTaggingReportSendingScheduler(services,systemSettings),
];
schedulers.forEach(s=>s.init());
services.logger.info(`setSchedulers: schedulers initiated on env ${env}`);
}
function setMessageConsumers(services:IServices, systemSettings: ISystemSetting[]){
new MessageConsumingManager(services, systemSettings).initialize();
services.logger.info(`setMessageConsumers: message consumers initiated on env ${env}`);
}
/**
* Setup server either with clustering or without it
* @param isClusterRequired
*/
const setupServer = async () => {
console.info(`setupServer: initating app in multiprocess mode`);
setupOrm();
const systemSettings = await SystemSetting.findAll().then((settings) => settings.filter((s:SystemSetting) => !s.env || s.env.includes(env) || env.includes(s.env)));
const services: IServices = new Services(systemSettings, process.env);
await services.init();
services.logger.info(`setupServer: initiated app in multi process mode`);
if (cluster.isMaster) {
setupWorkerProcesses(services);
setSchedulers(services, systemSettings);
setMessageConsumers(services, systemSettings);
} else {
await setApp(services, systemSettings);
}
};
function monitorServer(env:string, proc: NodeJS.Process){
if (env !== 'production'){
console.warn(`monitorServer - not production so no monitoring.`);
return;
}
if (!process.env.NEW_RELIC_KEY){
console.warn(`monitorServer - NEW_RELIC_KEY not provided, not loading.`);
return;
}
if (!process.env.APP_NAME){
console.warn(`monitorServer - APP_NAME not provided, not loading.`);
return;
}
const newRelic = require('newrelic');
console.info(`monitorServer - newrelic loaded: ${typeof newRelic === 'object'}`);
}
setupServer();
import {
IAnalyticsService,
ICacheService,
ICycleTaggingService,
IEmailSendingService,
IFileUploader,
ILogger,
IMonitoringService,
IPowerBIAuthService,
IPowerBIService,
IPuppeteerClusterService,
IReportMonitoringServiceFactory,
IServices,
ISiteService,
IUserService
} from "../interfaces/services.interfaces";
import {ISystemSetting} from "../interfaces/models/system_setting.interface";
import CacheService from "./cache.service";
import LoggerService from './logger.service';
import {S3FileUploader} from "./persistence/s3_file.uploader";
import {EmailSendingService} from "./sendouts/email_sending.service";
import {AdminUserService} from "./external_models/user.service";
import {ModerationService} from "./sendouts/moderation.service";
import {PowerBIAuthService} from "./power_bi/power_bi_auth.service";
import {PowerBIService} from "./power_bi/power_bi.service";
import {AdminSiteService} from "./external_models/site.service";
import {CycleTaggingService} from "./cycle_tagging.service";
import MonitoringService from "./monitoring/monitoring.service";
import ReportMonitoringServiceFactory from "./monitoring/report_monitoring.service.factory";
import {AutoSendoutCalculatorFactory} from "../logic/sendout/auto_sendout_calculator.factory";
import PuppeteerClusterService from "./screenshots/puppeteer_cluster.service";
import {RedisFactory} from "./redis.factory";
import {MutexFactory} from "./mutex.factory";
import {IMutexFactory} from "../interfaces/general.interfaces";
import {AnalyticsService} from "./analytics/analytics.service";
import {DummyAnalyticsService} from "./analytics/dummy.service";
import {IAutoSendoutCalculatorFactory} from "../interfaces/sendouts.interface";
export default class Services implements IServices {
puppeteerClusterService: IPuppeteerClusterService;
constructor(systemSettings: ISystemSetting[], processEnv: Record<string, string | undefined>){
this.puppeteerClusterService = new PuppeteerClusterService();
}
public async init(): Promise<void> {
await this.puppeteerClusterService.init();
}
}