3

我目前正在解析一个 js 对象列表,这些对象被一个一个地插入到数据库中,与 Node.js 大致类似:

return promise.map(list,
    return parseItem(item)
        .then(upsertSingleItemToDB)
    ).then(all finished!)

问题是当列表大小变得非常大(约 3000 个项目)时,并行解析所有项目会占用大量内存。使用 Promise 库添加并发限制非常容易,并且不会以这种方式耗尽内存(when/guard)。

但我也想优化 db upserts,因为 mongodb 提供了 bulkWrite 函数。由于一次解析和批量写入所有项目是不可能的,所以我需要将原始对象列表拆分为较小的集合,这些集合使用 Promise 并行解析,然后该集合的结果数组将传递给 Promisified bulkWrite。如果列表项,这将对剩余的集合重复。

我很难思考如何构建较小的承诺集,以便我一次只执行一组 parseSomeItems-BulkUpsertThem(类似于 Promise.all([set1Bulk][set2Bulk]),其中 set1Bulk是另一个并行解析器承诺数组吗?),任何伪代码帮助都将不胜感激(但我正在使用 when 如果这会有所不同)。

4

2 回答 2

1

如果使用 mongoose 和底层的 nodejs-mongodb-driver,它可能看起来像这样:

const saveParsedItems = items => ItemCollection.collection.bulkWrite( // accessing underlying driver
   items.map(item => ({
      updateOne: {
           filter: {id: item.id}, // or any compound key that makes your items unique for upsertion
           upsert: true,
           update: {$set: item} // should be a key:value formatted object
      }
   }))
);


const parseAndSaveItems = (items, offset = 0, limit = 3000) => { // the algorithm for retrieving items in batches be anything you want, basically
  const itemSet = items.slice(offset, limit);
  
  return Promise.all(
    itemSet.map(parseItem) // parsing all your items first
  )
    .then(saveParsedItems)
    .then(() => {
      const newOffset = offset + limit;
      if (items.length >= newOffset) {
        return parseAndSaveItemsSet(items, newOffset, limit);
      }
      
      return true;
    });
};

return parseAndSaveItems(yourItems);

于 2017-04-04T15:00:02.077 回答
1

第一个答案看起来很完整。然而,这里有一些其他的想法浮现在脑海中。

作为一种解决方法,您可以在执行下一个写操作之前在写操作的回调中调用超时函数。这可以让您的 CPU 和内存在调用之间中断。即使您在调用之间增加一毫秒,如果您总共有 3000 个写入对象,那也只会增加 3 秒。

或者您可以分割您的 insertObjects 数组,并将它们发送到他们自己的批量写入器。

于 2017-04-04T16:04:32.083 回答