8

在受到 NoFlo.js 的启发后,我正在学习 highland.js。我希望能够让流递归操作。在这个人为的示例中,我将提供一个乘以 2 的数字,我们过滤结果 <= 512。一旦将数字相乘,它就会反馈到系统中。我的代码有效,但如果我在管道中取出 doto 函数,它不会处理任何数字。我怀疑我将数据错误地发送回 returnPipe。有没有更好的方法将数据传输回系统?我错过了什么?

###
  input>--m--->multiplyBy2>---+
          |                   |
          |                   |
          +---<returnPipe<----+
###

H = require('highland')

input = H([1])
returnPipe = H.pipeline(
  H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
 .map((v)-> return v * 2)
 .filter((v)-> return v <= 512)
 .pipe(returnPipe)
4

2 回答 2

5

来自文档:doto重新发出源流时分离流。这意味着就管道而言,有一个函数仍在通过它传递流。如果doto取出,原始流不会在下一次迭代中通过返回流返回。

如果要使用管道,则必须向它传递一个接受流并发出流的方法。例如,您可以将doto方法替换为类似于H.map((v)=>{console.log(v); return v;})调用中的方法,H.pipeline并且由于该方法使用流并发出流,因此当流被传递回它时,它将继续流动.pipe(returnPipe)

编辑:要回答你的问题,当你声明let input = H([1])你实际上是在那里创建一个流时。您可以删除对管道和 returnPipe 的任何引用,并使用以下代码生成相同的输出:

let input = H([1]);

input.map((v)=> {
  return v * 2;
})
.filter((v)=> {
  if (v <= 512) {
    console.log(v);
  }
  return v <= 512;
})
.pipe(input);
于 2015-09-02T06:26:18.943 回答
1

我最初的意图是在 highland.js 中编写一个递归文件阅读器。我发布到 highland.js github 问题列表中,Victor Vu 帮助我把它和一篇精彩的文章放在一起。

H = require('highland')
fs = require('fs')
fsPath = require('path')

###
  directory >---m----------> dirFilesStream >-------------f----> out
                |                                         |
                |                                         |
                +-------------< returnPipe <--------------+

  legend: (m)erge  (f)ork

 + directory         has the initial file
 + dirListStream     does a directory listing
 + out               prints out the full path of the file
 + directoryFilter   runs stat and filters on directories
 + returnPipe        the only way i can

###

directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
  H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
    fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
  H.wrapCallback(fs.stat)(path).map (v) ->
    v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log
于 2015-09-10T16:17:29.827 回答