20

我有一个包含一系列承诺的数组,每个内部数组可以有 4k、2k 或 500 个承诺。

总共有大约 60k 个 promise,我也可以用其他值对其进行测试。

现在我需要执行Promise.all(BigArray[0]).

一旦第一个内部数组完成,我需要执行下一个Promise.all(BigArray[1])等等。

如果我尝试执行Promise.all(BigArray)它会抛出:

fatal error call_and_retry_2 allocation failed - process out of memory

我需要依次执行每个 Promise,而不是并行执行,我认为这就是 Node 所做的。我不应该使用新的库但是愿意考虑答案!。

编辑:

这是一段示例代码:

function getInfoForEveryInnerArgument(InnerArray) {
    const CPTPromises = _.map(InnerArray, (argument) => getDBInfo(argument));
    return Promise.all(CPTPromises)
        .then((results) => {
            return doSomethingWithResults(results);
        });
}
function mainFunction() {
    BigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    //the summ of all arguments is over 60k...
    const promiseArrayCombination = _.map(BigArray, (InnerArray, key) => getInfoForEveryInnerArgument(InnerArray));

    Promise.all(promiseArrayCombination).then((fullResults) => {
        console.log(fullResults);
        return fullResults;
    })
}
4

6 回答 6

15

你的问题有点名不副实,这可能会让一些人在这个问题和这个问题的先前版本中感到困惑。您正在尝试连续执行一批异步操作,一批操作,然后在完成后执行另一批操作。这些异步操作的结果通过 Promise 进行跟踪。Promise 本身代表已经开始的异步操作。“承诺”不会自己执行。所以从技术上讲,你不会“连续执行一批承诺”。你执行一组操作,用 Promise 跟踪它们的结果,然后在第一批完成后执行下一批。

无论如何,这里有一个序列化每批操作的解决方案。

您可以创建一个我通常调用的内部函数next(),让您处理每次迭代。当 promise 从处理一个 innerArray 解决时,您next()再次调用:

function mainFunction() {
    return new Promise(function(resolve, reject) {
        var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
        //the summ of all arguments is over 60k...
        var results = [];

        var index = 0;
        function next() {
            if (index < bigArray.length) {
                getInfoForEveryInnerArgument(bigArray[index++]).then(function(data) {
                    results.push(data);
                    next();
                }, reject);
            } else {
                resolve(results);
            }
        }
        // start first iteration
        next();
    });
}

这还将所有子结果收集到一个结果数组中,并返回一个主承诺,其解析值就是这个结果数组。所以,你可以这样使用:

mainFunction().then(function(results) {
    // final results array here and everything done
}, function(err) {
    // some error here
});

您还可以使用.reduce()设计模式串行迭代数组:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return bigArray.reduce(function(p, item) {
        return p.then(function(results) {
            return getInfoForEveryInnerArgument(item).then(function(data) {
                results.push(data);
                return results;
            })
        });
    }, Promise.resolve([]));
}

这比第一个选项创建了更多的同时承诺,我不知道这对于如此大的承诺是否是一个问题(这就是我提供原始选项的原因),但是这段代码更干净,这个概念使用起来很方便也适用于其他情况。


仅供参考,有一些承诺附加功能可以为您执行此操作。在Bluebird Promise 库(这是一个使用 Promise 进行开发的优秀库)中,它们Promise.map()为此而生:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return Promise.map(bigArray, getInfoForEveryInnerArgument);

}
于 2016-05-13T16:03:10.840 回答
14

2020 年 10 月的答案。Async/await 很短:只有 10 行代码 + JSDoc。

/**
 * Same as Promise.all(items.map(item => task(item))), but it waits for
 * the first {batchSize} promises to finish before starting the next batch.
 *
 * @template A
 * @template B
 * @param {function(A): B} task The task to run for each item.
 * @param {A[]} items Arguments to pass to the task for each call.
 * @param {int} batchSize
 * @returns {B[]}
 */
async promiseAllInBatches(task, items, batchSize) {
    let position = 0;
    let results = [];
    while (position < items.length) {
        const itemsForBatch = items.slice(position, position + batchSize);
        results = [...results, ...await Promise.all(itemsForBatch.map(item => task(item)))];
        position += batchSize;
    }
    return results;
}
于 2020-10-26T18:37:47.843 回答
3

@jfriend00async/await只需使用with添加到您的答案中reduce

function runPromisesInSeries(bigArray, getInfoForEveryInnerArgument) {
  try {
    return bigArray.reduce(async (acc, cItem) => {
      const results = await acc
      const data = await getInfoForEveryInnerArgument(cItem)
      results.push(data)
      return results
    }, Promise.resolve([]))
  } catch (err) {
    throw err
  }
}
于 2018-10-19T09:19:15.303 回答
2

此外,如果原始数组不是 promises 而是应该处理的对象,则可以使用Array.prototype.map(),Array.prototype.slice()和的组合在没有外部依赖关系的情况下完成批处理Promise.all()

// Main batch parallelization function.
function batch(tasks, pstart, atonce, runner, pos) {
  if (!pos) pos = 0;
  if (pos >= tasks.length) return pstart;
  var p = pstart.then(function() {
    output('Batch:', pos / atonce + 1);
    return Promise.all(tasks.slice(pos, pos + atonce).map(function(task) {
      return runner(task);
    }));
  });
  return batch(tasks, p, atonce, runner, pos + atonce);
}

// Output function for the example
function output() {
  document.getElementById("result").innerHTML += Array.prototype.slice.call(arguments).join(' ') + "<br />";
  window.scrollTo(0, document.body.scrollHeight);
}

/*
 * Example code.
 * Note: Task runner should return Promise.
 */
function taskrunner(task) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      output('Processed:', task.text, 'Delay:', task.delay);
      resolve();
    }, task.delay);
  });
}

var taskarray = [];
function populatetasks(size) {
  taskarray = [];
  for (var i = 0; i < size; i++) {
    taskarray.push({
      delay: 500 + Math.ceil(Math.random() * 50) * 10,
      text: 'Item ' + (i + 1)
    });
  }
}

function clean() {
  document.getElementById("result").innerHTML = '';
}

var init = Promise.resolve();
function start() {
  var bsize = parseInt(document.getElementById("batchsize").value, 10),
    tsize = parseInt(document.getElementById("taskssize").value, 10);
  populatetasks(tsize);
  init = batch(taskarray.slice() /*tasks array*/ , init /*starting promise*/ , bsize /*batch size*/ , taskrunner /*task runner*/ );
}
<input type="button" onclick="start()" value="Start" />
<input type="button" onclick="clean()" value="Clear" />&nbsp;Batch size:&nbsp;
<input id="batchsize" value="4" size="2"/>&nbsp;Tasks:&nbsp;
<input id="taskssize" value="10" size="2"/>
<pre id="result" />

于 2016-11-28T18:16:51.723 回答
0

你可以递归地做,例如这里我需要在mongo中放大约60k个文档,但是它太大了,一步完成,所以我拿了1k个文档,把它们发送到mongo,完成后我拿另一个 1k 文件等。

exports.rawRecursive = (arr, start) => {
        //ending condition
        if (start > arr.length) {
            return;
        }

        Rawmedicament.insertManyAsync(_.slice(arr, start, start + 1000)).then(() => {
            //recursive
            exports.rawRecursive(arr, start + 1000);
        });
};

如果你想注意,当一切都完成后,你可以在结束条件下放置回调,或者如果你喜欢 Promises,你可以在那里调用 resolve()。

于 2016-05-13T15:07:58.493 回答
0

动态批处理更多的 Promise

一个简单的实现,您可以将任务队列批处理以并行运行并更动态地添加:

class TaskQueue {
  constructor ({
    makeTask,
    initialData = [],
    getId = data => data.id,
    batchSize = 15,
    onComplete = () => {},
  }) {
    if (!makeTask) throw new Error('The "makeTask" parameter is required');

    this.makeTask = makeTask;
    this.getId = getId;
    this.batchSize = batchSize;
    this.onComplete = onComplete;
    this.queue = new Map();

    this.add(initialData);
  }

  add(...data) {
    data.forEach(item => {
      const id = this.getId(item);
      if (this.queue.has(id)) return;

      this.queue.set(id, item);
    });

    // running automatically on create or additional items added
    this.runNextBatch();
  }

  runNextBatch () {
    if (this.queueStarted) return;
    if (this.queue.size === 0) return;

    this.queueStarted = true;
    const currentBatchData = Array.from(this.queue.values()).slice(0, this.batchSize);

    const tasks = currentBatchData.map(data => {
      const id = this.getId(data);

      // Have some error handling implemented in `makeTask`
      this.makeTask(data)
        .finally(() => this.queue.delete(id));
    });

    return Promise.all(tasks)
      .then(() => {
        this.queueStarted = false;
        this.runNextBatch();
      })
      .finally(() => {
        this.queueStarted = false;
        if (this.queue.size === 0) this.onComplete();
      });
  }
}

// Usage
const lotOfFilesForUpload = [{ uri: 'file://some-path' }, { uri: 'file://some-other-path' }];

const upload = (file) => console.log('fake uploading file: ', file);

const taskQueue = new TaskQueue({
  initialData: lotOfFilesForUpload,
  getId: file => file.uri,
  makeTask: file => upload(file),
  onComplete: () => console.log('Queue completed'),
});

// You can add more tasks dynamically
taskQueue.add({ uri: 'file://yet-another-file' });

于 2020-05-12T16:53:02.987 回答