0

我正在尝试使用 Highland.js 设计工作流程。我无法弄清楚 Highland.js 如何用于它。

我有一个基于流的工作流程如下(伪代码),

read                      //fs.createReadStream(...)
   .pipe(parse)           //JSONStream.parse(...)
   .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0
   .pipe(transform)       //fn(item) { return tranform(item); }
   .pipe(write);          //mongoClient.db.collection.insert(doc)

filterDuplicate 查找数据库以检查读取记录是否存在(使用条件)并返回布尔结果。为了使过滤器工作,它需要一个活动的数据库连接,我想在流完成之前重复使用它。一种方法是在读取之前打开连接并关闭写入的“完成”事件;这意味着我需要将连接作为参数传递以进行过滤和写入,如果两种方法都使用相同的数据库,这将起作用。

在上述工作流程中,filterDuplicate 和 write 也可能使用不同的数据库。所以我希望连接在每个函数中都包含和管理,这使它成为一个独立的可重用单元。

我正在寻找有关如何使用 Highland 进行设计的任何输入。

谢谢。

4

1 回答 1

0

它不会像使用pipe一堆时间那么容易。您必须为任务使用最合适的 API 方法。

这是您可能最终接近的粗略示例:

read
  .through(JSONStream.parse([true]))
  .through((x) => {
    h((next, push) => { // use a generator for async operations
      h.wrapCallback( mongoCountQuery )( params ) // you don't have to do it this way
        .collect()
        .toCallback((err, result) => {
          if ( result > 0 ) push( err, x ); // if it met the criteria, hold onto it
          return push( null, h.nil ); // tell highland this stream is done
        });
    });
  })
  .merge() // because you've got a stream of streams after that `through`
  .map(transform) // just your standard map through a transform
  .through((x) => {
    h((next, push) => { // another generator for async operations
      h.wrapCallback( mongoUpdateQuery )( params )
        .toCallback((err, results) => {
          push( err, results );
          return push( null, h.nil );
        });
    });
  })
  .merge() // another stream-of-streams situation
  .toCallback( cb ); // call home to say we're done
于 2016-06-14T03:51:54.797 回答