我有一个脚本可以执行以下操作
- 从 MongoDB 加载数据,其中计数器
false
有限制 - 为加载的数据集处理计数器更新 MongoDB
true
- 对于每条记录,处理它们并将它们插入到新集合中。
数据太多,因此需要工人加快流程。
问题 - 有时工作人员正在使用其他工作人员正在使用的相同加载数据。这会导致新集合中的重复记录。
我想避免这种情况。
这是我迄今为止所做的,以使我的员工保持敬业度,直到工作完成
if (cluster.isPrimary) {
//Is Master Process
for (let i = 0; i < numCPUs; i++) {
forkWorker();
}
} else {
//Is Child Process
(async () => {
await my_module.main(process); //Execute Business Logic
}
}
async function forkWorker() {
let worker = cluster.fork();
worker.on('exit', (code, signal) => handleWorkerDeath(worker, code, signal));
}
function handleWorkerDeath(worker, code, signal) {
if (code === 0) {
console.log(`Worker ${worker.process.pid} died peacefully shutting down master`);
process.exit();
} else {
console.log(`Worker ${worker.process.pid} died with error code ${code},restarting worker`);
forkWorker();
}
}
这是我的主要功能
const db = await mongoPoolPromise();
var results = await db
.collection(collection_name)
.find({
last_scraped: null,
is_parsed: false,
})
.limit(100)
.toArray();
if(results.length > 0) {
console.log(results[0]._id);
var ids = await pluckIds(results, '_id');
if(ids.length > 0) {
var update_status = await this.updateUrlsToProcessing(ids);
if(update_status){
for(var urlData of results) {
await this.processArticle(urlData);
}
var status = await this.saveArticles();
}else{
console.log('Not updating');
}
process.exit(1);
} else {
process.exit(0);
}