我正在尝试使用 Highland.js 设计工作流程。我无法弄清楚 Highland.js 如何用于它。
我有一个基于流的工作流程如下(伪代码),
read //fs.createReadStream(...)
.pipe(parse) //JSONStream.parse(...)
.pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0
.pipe(transform) //fn(item) { return tranform(item); }
.pipe(write); //mongoClient.db.collection.insert(doc)
filterDuplicate 查找数据库以检查读取记录是否存在(使用条件)并返回布尔结果。为了使过滤器工作,它需要一个活动的数据库连接,我想在流完成之前重复使用它。一种方法是在读取之前打开连接并关闭写入的“完成”事件;这意味着我需要将连接作为参数传递以进行过滤和写入,如果两种方法都使用相同的数据库,这将起作用。
在上述工作流程中,filterDuplicate 和 write 也可能使用不同的数据库。所以我希望连接在每个函数中都包含和管理,这使它成为一个独立的可重用单元。
我正在寻找有关如何使用 Highland 进行设计的任何输入。
谢谢。