0

我正在尝试设置一个简单的文档馈送器来对 ElasticSearch 进行基准测试,我选择了 NodeJS,因为我认为使用简单的 JSON 结构最容易。不幸的是,我似乎是在向自己开枪。

以下是相关位:

  var logResults = function (results) {
    docsIndexed++;
    var now = +new Date();
    if (docsIndexed % 10000 === 0) {
      log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms");
    }
  }

  var submitDocument = function (source, index, type) {
    var doc = ejs.Document(index, type).source(source);
    doc.doIndex(logResults);
  }

  var schemas = {};
    _(10).times(function (idx) {
    schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10);
  });

  var docCount = 0, docsIndexed = 0, start = +new Date();
  Object.keys(schemas).forEach(function (schemaName, idx) {
    var register = function () {
      submitDocument(generateRandomDocument(schemas[schemaName]), 
        'documents', schemaName);
      docCount++;
    };
    _((idx + 1) * 1000).times(register);
    log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema " 
      + schemaName + ". Total: " + docCount);
  });

这适用于多达 100 000 条记录的数据集,但如果我要数百万,它会因内存不足错误而炸毁我。

from的doIndex函数elastic.js是异步的,我怀疑许多对象在实际执行之前就已排队。当该数字变得重要时,该过程就会终止。我不明白为什么在循环结束之前没有执行回调。我想要的是一种使此同步或对其进行某种池化的方法,这样它就不会在发送其他对象之前将更多对象排队。

有人可以建议一个可以帮助解决此问题的库或更好的代码结构吗?谢谢。


更新

我已经尝试过 Peter 的使用 async.queue 的建议,我想出了这个:

  /** Submit QUANT * (idx + 1) documents for each schema into the index */ 
  var QUANT = 100
    , docCount = 0
    , docsIndexed = 0
    , queue = async.queue(submitDocument, 1000)
    , paused = false
    , start = +new Date();

  queue.saturated = function () {
    log("queue is full");
    paused = true;
  };
  queue.empty = function () {
    log("All items were given to workers");
    paused = false; 
  };

  Object.keys(schemas).forEach(function (schemaName, idx) {
    var count = 0;
    while (count < (idx + 1) * QUANT) {
      if (!paused) {
        queue.push({ 
          source: generateRandomDocument(schemas[schemaName]), 
          index: 'documents', 
          type: schemaName 
        });
        count++; docCount++;
      }
    };
    log("Registered " + count + " documents for indexing for schema " 
      + schemaName + ". Total: " + docCount);
  });

如果它在循环中暂停,它将永远挂起(即被queue.saturated调用,paused 设置为 true,然后程序卡在 while 循环中)。queue.empty callback永远不会被调用。如果队列的并发限制高于我要处理的数字,这很好用 - 所有消息都按预期记录。我应该在这里改变什么?

更新#2

我已将代码更改为使用异步循环,现在它可以工作了。我遇到了一个RangeError: Maximum call stack size exceeded我挣扎了一段时间的错误。

  Object.keys(schemas).forEach(function (schemaName, idx) {
    var count = 0, executions = 0;
    async.whilst(
      function () { 
        var test = count < (idx + 1) * QUANT; 
        if (!test) log("Registered " + count + " documents for indexing for schema " 
          + schemaName + ". Executions: " + executions + ". Total: " + docCount);
        return test;
      },

      function (callback) {
        executions++;
        if (!paused) {
          queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName });
          count++; docCount++;
        }
        setTimeout(callback, 0);
        // also tried with "return process.nextTick(callback)"
        // and return callback();
        // this blows up nicely with an out of memory error
      },
      function (err) {}
    );
  });

我开始感到沮丧,因为我不认为这个用例真的那么复杂,我希望我对该语言的工作原理有一个公平的理解。

4

1 回答 1

1

最好的选择是async.queue有很大的限制。只要确保您不只是在队列已经饱和后继续将项目添加到队列中即可。使用队列的饱和度作为背压,等待一些工作完成后再开始排队更多任务。队列具有支持这些关键事件的钩子。

于 2013-06-19T18:29:22.740 回答