0

我有一个脚本可以执行以下操作

  1. 从 MongoDB 加载数据,其中计数器false有限制
  2. 为加载的数据集处理计数器更新 MongoDBtrue
  3. 对于每条记录,处理它们并将它们插入到新集合中。

数据太多,因此需要工人加快流程。

问题 - 有时工作人员正在使用其他工作人员正在使用的相同加载数据。这会导致新集合中的重复记录。

我想避免这种情况。

这是我迄今为止所做的,以使我的员工保持敬业度,直到工作完成

if (cluster.isPrimary) {
    //Is Master Process
    for (let i = 0; i < numCPUs; i++) {
        forkWorker();
    }
} else {
    //Is Child Process
    (async () => {
        await my_module.main(process);  //Execute Business Logic
    }
}

async function forkWorker() {
    let worker = cluster.fork();
    worker.on('exit', (code, signal) => handleWorkerDeath(worker, code, signal));
}
function handleWorkerDeath(worker, code, signal) {
    if (code === 0) {
        console.log(`Worker ${worker.process.pid} died peacefully shutting down master`);
        process.exit();
    } else {
        console.log(`Worker ${worker.process.pid} died with error code ${code},restarting worker`);
        forkWorker();
    }
}

这是我的主要功能

const db = await mongoPoolPromise();
var results = await db
     .collection(collection_name)
     .find({
         last_scraped: null,
         is_parsed: false,
     })
     .limit(100)
     .toArray();
if(results.length > 0) {
    console.log(results[0]._id);
    var ids = await pluckIds(results, '_id');
    if(ids.length > 0) {
        var update_status = await this.updateUrlsToProcessing(ids);
        if(update_status){
             for(var urlData of results) {
                 await this.processArticle(urlData);
             }
             var status = await this.saveArticles();
         }else{
             console.log('Not updating');
         }
     process.exit(1);
} else {
    process.exit(0);
}
          
4

0 回答 0