我正在尝试设置一个简单的文档馈送器来对 ElasticSearch 进行基准测试,我选择了 NodeJS,因为我认为使用简单的 JSON 结构最容易。不幸的是,我似乎是在向自己开枪。
以下是相关位:
var logResults = function (results) {
docsIndexed++;
var now = +new Date();
if (docsIndexed % 10000 === 0) {
log("Indexed " + docsIndexed + " documents in " + (now - start) + "ms");
}
}
var submitDocument = function (source, index, type) {
var doc = ejs.Document(index, type).source(source);
doc.doIndex(logResults);
}
var schemas = {};
_(10).times(function (idx) {
schemas[pickRandomWord()] = generateRandomDocumentSchema(_.random(idx, 15), 10);
});
var docCount = 0, docsIndexed = 0, start = +new Date();
Object.keys(schemas).forEach(function (schemaName, idx) {
var register = function () {
submitDocument(generateRandomDocument(schemas[schemaName]),
'documents', schemaName);
docCount++;
};
_((idx + 1) * 1000).times(register);
log("Registered " + ((idx + 1) * 1000) + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
这适用于多达 100 000 条记录的数据集,但如果我要数百万,它会因内存不足错误而炸毁我。
from的doIndex
函数elastic.js
是异步的,我怀疑许多对象在实际执行之前就已排队。当该数字变得重要时,该过程就会终止。我不明白为什么在循环结束之前没有执行回调。我想要的是一种使此同步或对其进行某种池化的方法,这样它就不会在发送其他对象之前将更多对象排队。
有人可以建议一个可以帮助解决此问题的库或更好的代码结构吗?谢谢。
更新
我已经尝试过 Peter 的使用 async.queue 的建议,我想出了这个:
/** Submit QUANT * (idx + 1) documents for each schema into the index */
var QUANT = 100
, docCount = 0
, docsIndexed = 0
, queue = async.queue(submitDocument, 1000)
, paused = false
, start = +new Date();
queue.saturated = function () {
log("queue is full");
paused = true;
};
queue.empty = function () {
log("All items were given to workers");
paused = false;
};
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0;
while (count < (idx + 1) * QUANT) {
if (!paused) {
queue.push({
source: generateRandomDocument(schemas[schemaName]),
index: 'documents',
type: schemaName
});
count++; docCount++;
}
};
log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Total: " + docCount);
});
如果它在循环中暂停,它将永远挂起(即被queue.saturated
调用,paused 设置为 true,然后程序卡在 while 循环中)。queue.empty callback
永远不会被调用。如果队列的并发限制高于我要处理的数字,这很好用 - 所有消息都按预期记录。我应该在这里改变什么?
更新#2
我已将代码更改为使用异步循环,现在它可以工作了。我遇到了一个RangeError: Maximum call stack size exceeded
我挣扎了一段时间的错误。
Object.keys(schemas).forEach(function (schemaName, idx) {
var count = 0, executions = 0;
async.whilst(
function () {
var test = count < (idx + 1) * QUANT;
if (!test) log("Registered " + count + " documents for indexing for schema "
+ schemaName + ". Executions: " + executions + ". Total: " + docCount);
return test;
},
function (callback) {
executions++;
if (!paused) {
queue.push({ source: generateRandomDocument(schemas[schemaName]), index: 'documents', type: schemaName });
count++; docCount++;
}
setTimeout(callback, 0);
// also tried with "return process.nextTick(callback)"
// and return callback();
// this blows up nicely with an out of memory error
},
function (err) {}
);
});
我开始感到沮丧,因为我不认为这个用例真的那么复杂,我希望我对该语言的工作原理有一个公平的理解。