6

我想并行运行 N 个嵌套流/管道,并将每个元素仅发送到一个嵌套流。平衡允许我这样做,但我想将具有相同“键”的元素路由到相同的嵌套流或管道。

我看不到任何功能可以做到这一点,所以我编写了一个基本的 POC,它将每个元素广播到每个流。然后流/管道只过滤它应该处理的元素(见下文)。这似乎效率很低,有没有更好的方法将元素路由到特定的嵌套流?

package io.xxx.streams

import cats.effect.{ExitCode, IO, IOApp}
import fs2.{Pipe, Stream}

object StreamsApp extends IOApp {

  import cats.syntax.functor._
  import scala.concurrent.duration._

  case class StreamMessage(routingKey: Int, value: String)

  // filter elements which belong to the given bin
  def filterAndLog(bin: Int, numBins: Int): IO[Pipe[IO, StreamMessage, Unit]] = IO {
    val predicate = (m: StreamMessage) => m.routingKey % numBins == bin

    in: Stream[IO, StreamMessage] => {
      in.filter(predicate).evalMap(m => IO {
        println(s"bin $bin - ${m.value}")
      })
    }
  }

  override def run(args: List[String]): IO[ExitCode] = {
    val effectsStream = for {
      pipeOne <- Stream.eval(filterAndLog(0, 2))
      pipeTwo <- Stream.eval(filterAndLog(1, 2))
      s <- Stream
        .fixedDelay[IO](100.millis)
        .zipRight(Stream.range(0, 50))
        .map(i => StreamMessage(i, s"message $i"))
        .broadcastThrough(pipeOne, pipeTwo)
    } yield s

    effectsStream.compile.drain.as(ExitCode(0))
  }

}

具有相同路由键的消息应由相同的流/管道处理

4

0 回答 0