0

我正在尝试编写一个可写流,该流采用对象流并将它们输入到 mongodb 数据库中。在消费对象流之前,我首先需要等待数据库连接建立,但我似乎做错了什么,因为程序永远不会到达插入部分。

// ./mongowriter.js

let mongo = mongodb.MongoClient,
    connectToDb = _.wrapCallback(mongo.connect);

export default url => _.pipeline(s => {
  return connectToDb(url).flatMap(db => {
    console.log('Connection established!');
    return s.flatMap(x => /* insert x into db */);
  });
});

....

// Usage in other file
import mongowriter from './mongowriter.js';

let objStream = _([/* json objects */]);

objStream.pipe(mongoWriter);

该程序只是退出而没有“建立连接!” 曾经被写入控制台。

我错过了什么?我应该遵循某种成语吗?

4

1 回答 1

0

通过阅读源代码和一些一般性的实验,我想出了如何做一个异步的事情,然后继续通过流处理。基本上,您使用 flatMap 将异步任务中的事件替换为您实际要处理的流。

另一个我没想到并且让我失望的怪癖是,_.pipeline除非原始流在回调中完全消耗,否则这将不起作用。这就是为什么它不能简单地放入一个 _.map 并记录东西(这就是我尝试调试它的方式)。相反,需要确保在末尾有一个each或。done下面是一个最小的例子:

export default _ => _.pipeline( stream => {
  return _(promiseReturningFunction())
    .tap(_ => process.stdout.write('.'))
    .flatMap(_ => stream)
    .each(_ => process.stdout.write('-'));
});

// Will produce something like the following when called with a non-empty stream.
// Note the lone '.' in the beginning.
// => .-------------------

基本上,一个'。当异步函数完成时输出,流的每个对象都有一个“-”。

希望这可以节省一些时间。我花了很长时间才弄清楚这一点。^^

于 2016-03-08T18:10:42.923 回答