2

如果我有一个发送类型值的简单过程,String并且我希望将它们发送到多个接收器(即每个接收器都被发送String),我该怎么做?

例如,运行这个程序:

object Play extends App {

  def prepend(s: String): String => String = s ++ _
  val out1 = io.stdOutLines.map(prepend("1-") andThen _)
  val out2 = io.stdOutLines.map(prepend("2-") andThen _)

  val p = io.stdInLines to (out1 merge out2)
  p.run.run
}

输出如下所示:

a     //input
1-a
b     //input
2-b
c     //input
2-c
d     //input
1-d

我希望输出是这样的:

a     //input
1-a
2-a
b     //input
2-b
1-b
c     //input
2-c
1-c
d     //input
1-d
2-d

编辑

我可以按如下方式实现:

implicit class ToBoth[O](p: Process[Task, O]) {
  def toBoth(s1: Sink[Task, O], s2: Sink[Task, O]): Process[Task, Unit] = {
    (for (o <- p; n <- Process.emit(o) ++ Process.emit(o)) yield n) to (s1 interleave s2)
  }
}

也就是说,我复制输入并交错输出。这可以概括为:

def toAll(sinks: Sink[Task, O] *): Process[Task, Unit] = {
  (for (o <- p; n <- Process.emitAll(sinks.map(_ => o))) yield n) to sinks.reduceLeftOption(_ interleave _).getOrElse(Process.empty)
}

编辑 2

我刚刚意识到泛化toAll不起作用。toBoth确实,虽然

有更好的(内置)方法吗?

4

1 回答 1

3

您还可以使用observeandto将多个Sinks 附加到 a Process

val p = io.stdInLines.observe(out1).to(out2)

observe就像to但与传入水槽的内容相呼应。所以io.stdInLines.observe(out1)仍然会发出来自 stdin 的字符串(这意味着它是 type Process[Task, String]),但也会将它们发送到 sink out1


正如 Eric 所指出的,zip Sinks 在一起也是可能的。这是一个更详细的示例,它根据日志级别将日志文件的各个行发送到不同的接收器:

sealed trait Loglevel
case object Info extends Loglevel
case object Debug extends Loglevel
case object Warning extends Loglevel

case class Line(level: Loglevel, line: String)

val outInfo = io.stdOutLines.contramap((l: Line) => "I: " + l.line)
val outDebug = io.stdOutLines.contramap((l: Line) => "D: " + l.line)
val outWarning = io.stdOutLines.contramap((l: Line) => "W: " + l.line)

val zipped = outInfo.zip(outDebug).zip(outWarning).map {
  case ((fInfo, fDebug), fWarning) =>
    (l: Line) => l.level match {
      case Info    => fInfo(l)
      case Debug   => fDebug(l)
      case Warning => fWarning(l)
    }
}

val lines = List(
  Line(Info, "Hello"),
  Line(Warning, "Oops"),
  Line(Debug, "ui ui"),
  Line(Info, "World"))

Process.emitAll(lines).liftIO.to(zipped).run.run

运行它将输出:

I: Hello
W: Oops
D: ui ui
I: World
于 2014-11-20T21:48:33.653 回答