我有一个 node.js 程序,它使用流来读取文件(nodejs 事件流为每个流设置一个变量)
我想使用相同的程序将此数据写入弹性搜索。我写了一个小写函数
var writeFunction = function(data) {
//console.log(data);
var client = request.newClient("http://localhost:9200");
client.post('/newtest3/1',data,function(err,res,body) {
return console.log(res.statusCode);
});
};
并将其与流媒体联系起来
var processMyFile = function(file) {
var stream = getStream(file);
var nodeName = stream.nodeName;
stream
.pipe(es.split())
.on('end',endFunction)
.pipe(es.map(function(data,cb) {
processFunction(nodeName,data,cb);
}))
.pipe(es.map(function(data,cb) {
writeFunction(data);
}));
}
以上按预期异步工作并写入数据,除了它需要很长时间。它似乎也可以作为缓冲区工作,因为写入需要比读取更长的时间。(使用管道的优势)我知道有一个弹性搜索中的批量接口,我可以使用它导入。Kibana 入门指南 ( http://www.elasticsearch.org/guide/en/kibana/current/using-kibana-for-the-first-time.html )中的shakesphere.json 示例
这意味着我需要以批量导入所需的格式创建文件,然后运行 curl 程序等。我想避免创建临时文件。
作为流式传输过程的一部分,是否有更简单的方法可以更快地将数据导入弹性搜索