我希望能够将数据从 scalaz 流发送到外部程序,然后在未来大约 100 毫秒内得到该项目的结果。虽然我可以通过以下代码通过Sink
输入流压缩输出流Process
然后丢弃Sink
副作用来做到这一点,但我觉得这个解决方案可能非常脆弱。
如果外部程序的输入项之一有错误,那么一切都将不同步。我觉得最好的选择是将某种增量 ID 发送到外部程序中,它可以在将来回显,这样如果发生错误,我们可以重新同步。
我遇到的主要麻烦是将数据发送到外部程序的结果与程序Process[Task, Unit]
的输出结合在一起Process[Task, String]
。我觉得我应该使用来自wyn
但不确定的东西。
import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._
object Main extends App {
/*
# echo.sh just prints to stdout what it gets on stdin
while read line; do
sleep 0.1
echo $line
done
*/
val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")
val source: Process[Task, String] = Process.repeatEval(Task{
Thread.sleep(1000)
System.currentTimeMillis().toString
})
val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))
val in: Process[Task, Unit] = source to printLines
val zip: Process[Task, (Unit, String)] = in.zip(linesR)
val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
out.run.run
}