0

我有一个流列表,列表的List[Stream[_]]大小在函数的开头是已知的,每个流的大小等于nn+1。我想获得交错流,例如

def myMagicFold[A](s: List[Stream[A]]): Stream[A]

val streams = List(Stream(1,1,1),Stream(2,2,2),Stream(3,3),Stream(4,4)) 

val result = myMagicFold(streams)

//result = Stream(1,2,3,4,1,2,3,4,1,2)

我正在使用fs2.Stream. 我的第一个观点:

val result = streams.fold(fs2.Stream.empty){
   case (s1, s2) => s1.interleaveAll(s2)
}

// result = Stream(1, 4, 3, 4, 2, 3, 1, 2, 1, 2)

我正在寻找基于基本操作的解决方案(map, fold,...)

4

2 回答 2

3

您最初的猜测是好的,但是interleaveAll太快变平了,所以这就是您没有得到预期订单的原因。这是应该做你试图实现的代码:


  def zipAll[F[_], A](streams: List[Stream[F, A]]): Stream[F, A] =
    streams
      .foldLeft[Stream[F, List[Option[A]]]](Stream.empty) { (acc, s) =>
        zipStreams(acc, s)
      }
      .flatMap(l => Stream.emits(l.reverse.flatten))

  def zipStreams[F[_], A](s1: Stream[F, List[Option[A]]], s2: Stream[F, A]): Stream[F, List[Option[A]]] =
    s1.zipAllWith(s2.map(Option(_)))(Nil, Option.empty[A]) { case (acc, a) => a :: acc }

在这种情况下,您将每个流的第 n 个元素添加到列表中,然后转换Stream为稍后展平为结果流的元素。由于fs2.Stream是基于拉的,因此您一次在内存中只有一个列表。

于 2019-10-02T20:17:16.940 回答
1

这是一个尝试,它按预期工作......

import cats.effect.IO
import cats.implicits._
import fs2.Stream

def myMagicFold[A](streams: List[Stream[IO, A]]): Stream[IO, A] =
  Stream.unfoldEval(streams) { streams =>
    streams.traverse { stream =>
      stream.head.compile.last
    } map { list =>
      list.sequence.map { chunk =>
        Stream.emits(chunk) -> list.map(_.tail)
      }
    }
  }.flatten

但是,这远不是一个好的解决方案,它的效率极低,因为它在每一步都重新评估每个Stream
您可以使用以下代码确认:

def stream(name: String, n: Int, value: Int): Stream[IO, Int] =
  Stream
    .range(start = 0, stopExclusive = n)
    .evalMap { i =>
      IO {
        println(s"${name} - ${i}")
        value
      }
    }
    
val list = List(stream("A", 3, 1), stream("B", 2, 2), stream("C", 3, 3))
myMagicFold(list).compile.toList.unsafeRunAsync(println)

哪个会打印

A - 0
B - 0
C - 0
A - 0
A - 1
B - 0
B - 1
C - 0
C - 1
A - 0
A - 1
A - 2
B - 0
B - 1
C - 0
C - 1
C - 2

右(列表(1、2、3、1、2、3))

我很确定可以使用Pulls解决此问题,但我对此没有任何经验。

于 2019-10-02T15:57:09.693 回答