2

我希望能够将数据从 scalaz 流发送到外部程序,然后在未来大约 100 毫秒内得到该项目的结果。虽然我可以通过以下代码通过Sink输入流压缩输出流Process然后丢弃Sink副作用来做到这一点,但我觉得这个解决方案可能非常脆弱。

如果外部程序的输入项之一有错误,那么一切都将不同步。我觉得最好的选择是将某种增量 ID 发送到外部程序中,它可以在将来回显,这样如果发生错误,我们可以重新同步。

我遇到的主要麻烦是将数据发送到外部程序的结果与程序Process[Task, Unit]的输出结合在一起Process[Task, String]。我觉得我应该使用来自wyn但不确定的东西。

import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(1000)
     System.currentTimeMillis().toString
  })

  val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
  val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))

  val in: Process[Task, Unit] = source to printLines

  val zip: Process[Task, (Unit, String)] = in.zip(linesR)
  val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
  out.run.run
}
4

1 回答 1

0

在深入研究更高级的类型之后。它看起来Exchange正是我想要的。

import java.io.PrintStream

import scalaz._
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.stream.io._

object Main extends App {
/*
  # echo.sh just prints to stdout what it gets on stdin
  while read line; do
    sleep 0.1
    echo $line
  done
*/
  val program: java.lang.Process = Runtime.getRuntime.exec("./echo.sh")

  val source: Process[Task, String] = Process.repeatEval(Task{
     Thread.sleep(100)
     System.currentTimeMillis().toString
  })

  val read: stream.Process[Task, String] = linesR(program.getInputStream)
  val write: Sink[Task, String] = printLines(new PrintStream(program.getOutputStream))
  val exchange: Exchange[String, String] = Exchange(read, write)
  println(exchange.run(source).take(10).runLog.run)
}
于 2015-06-02T07:47:38.500 回答