0

我想将下面的对象附加到流中的每个对象

{"index":{"_index":"tvseries","_type":"internindex"}}

我的流看起来像这样

[
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
  {"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
  {"showname":"The X Files","episode":"01","content":"What?","season":"1"}
]

我的流应该是什么样子!

> -> POST http://localhost:9200/_bulk   {"index":{"_index":"tvseries","_type":"internindex"}}  
> {"showname":"The X Files","episode":"04","content":"Before
> what?","season":"1"}  
> {"index":{"_index":"tvseries","_type":"internindex"}}  
> {"showname":"The X
> Files","episode":"04","content":"Great.","season":"1"}  
> {"index":{"_index":"tvseries","_type":"internindex"}}  
> {"showname":"The X
> Files","episode":"01","content":"What?","season":"1"}

如何在我现有的以下代码库中使用 jsonstream 来实现这一点

var stream = new ElasticsearchWritableStream(client, {
  highWaterMark: 256,
  flushTimeout: 500
});

pg.connect(connectionString,function(err, client, done) {
  if(err) throw err;
  var query = new QueryStream('SELECT * FROM srt limit 2')
  var streams = client.query(query)

  //release the client when the stream is finished
  streams.on('end', done)
  streams.pipe(JSONStream.stringify()).pipe(stream)
})

我目前正在使用的 npm 包

用于弹性搜索中的批量插入!

弹性搜索可写流

用于从 postgres 获取数据到流中!

pg-查询流

缺少的部分是将 postgres 流转换为弹性可写流!关于如何实现这一目标的任何建议、指示和建议!

4

1 回答 1

0

所以基本上,没有太多代码更改的唯一可行选项是构建从 postgres 本身而不是在 node.js 对象中批量插入弹性搜索所需的格式!

"SELECT 'tvseries' as index,'internindex' as type, json_build_object('showname', showname, 'epsiode', ep,'content',content,'season',season) AS body"
+" FROM   srt  where shownameid=4"
于 2017-07-13T14:00:50.417 回答