我想知道是否有一种优雅的方式来实现这样的目标:
val l = Stream.from(1)
val parts = l.some_function(3) //any number
parts.foreach( println(_) )
> 1,4,7,10...
> 2,5,8,11...
> 3,6,9,12...
实际上,我需要在 Streams 上进行此类操作以进行并行化 - 将数据拆分到多个参与者之间,而无需将整个内容加载到内存中。
我想知道是否有一种优雅的方式来实现这样的目标:
val l = Stream.from(1)
val parts = l.some_function(3) //any number
parts.foreach( println(_) )
> 1,4,7,10...
> 2,5,8,11...
> 3,6,9,12...
实际上,我需要在 Streams 上进行此类操作以进行并行化 - 将数据拆分到多个参与者之间,而无需将整个内容加载到内存中。
Split a scala list into n interleaving lists的答案完全符合条件,稍作修改以适应 Streams:
def round[A](seq: Iterable[A], n: Int) = {
(0 until n).map(i => seq.drop(i).sliding(1, n).flatten)
}
round(Stream.from(1),3).foreach(i => println(i.take(3).toList))
List(1, 4, 7)
List(2, 5, 8)
List(3, 6, 9)
我唯一能想到的:
def distribute[T](n: Int)(x: Stream[T]) = (0 until n).map { p =>
x.zipWithIndex.collect {
case (e,i) if i % n == p => e
}
}
这有点难看,因为每个子流都必须完全遍历主流。但我认为你不能在保持(明显)不变性的同时减轻这种情况。
你有没有想过将单个任务分派给演员,并有一个“任务分配器”来做这件事?
一种简单的方法涉及为您想要的索引生成算术序列,然后将其映射到流。apply 方法会拉出对应的值:
def f[A]( s:Stream[A], n:Int ) =
0 until n map ( i => Iterator.iterate(0)(_+n) map ( s drop i ) )
f( Stream from 1, 3 ) map ( _ take 4 mkString "," )
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12)
一个更高效的解决方案将使用一个迭代器,它的 next 方法只是从流中返回算术序列中下一个索引处的值:
def comb[A]( s:Stream[A], first:Int, step:Int ):Iterator[A] = new Iterator {
var i = first - step
def hasNext = true
def next = { i += step; s(i) }
}
def g[A]( s:Stream[A], n:Int ) =
0 until n map ( i => comb(s,i,n) )
g( Stream from 1, 3 ) map ( _ take 4 mkString "," )
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12)
不过,您提到这是针对演员的——如果这是 Akka,也许您可以使用循环路由器。
更新:上面(显然是错误的)假设只要程序正在运行,就可以做更多的工作,所以 hasNext 总是返回 true;请参阅 Mikhail 的答案以获取也适用于有限流的版本。
更新:米哈伊尔已经确定这个对先前 StackOverflow 问题的答案实际上有一个适用于有限和无限流的答案(尽管它看起来不像迭代器那样执行)。
scala> (1 to 30 grouped 3).toList.transpose foreach println
List(1, 4, 7, 10, 13, 16, 19, 22, 25, 28)
List(2, 5, 8, 11, 14, 17, 20, 23, 26, 29)
List(3, 6, 9, 12, 15, 18, 21, 24, 27, 30)
def roundRobin[T](n: Int, xs: Stream[T]) = {
val groups = xs.grouped(n).map(_.toIndexedSeq).toStream
(0 until n).map(i => groups.flatMap(_.lift(i)))
}
在无限情况下工作:
scala> roundRobin(3, Stream.from(0)).map(_.take(3).force.mkString).mkString(" ")
res6: String = 036 147 258
使用flatMap
/lift
而不是普通的map
/apply
意味着即使输入是有限的并且长度不是 n 的倍数,它也可以工作:
scala> roundRobin(3, Stream.from(0).take(10)).map(_.mkString).mkString(" ")
res5: String = 0369 147 258
我在 Scala 库中没有找到任何这样的函数,所以我改进了 AmigoNico 答案的迭代器变体。该代码同时处理有限和无限集合。
def splitRoundRobin[A](s: Iterable[A], n: Int) = {
def comb[A](s: Iterable[A], first: Int, step: Int): Iterator[A] = new Iterator[A] {
val iter = s.iterator
var nextElem: Option[A] = iterToNext(first)
def iterToNext(elemsToSkip: Int) = {
iterToNextRec(None, elemsToSkip)
}
def iterToNextRec(next: Option[A], repeat: Int): Option[A] = repeat match {
case 0 => next
case _ => if (iter.hasNext) iterToNextRec(Some(iter.next()), repeat - 1) else None
}
def hasNext = nextElem.isDefined || {
nextElem = iterToNext(step)
nextElem.isDefined
}
def next = {
var result = if (nextElem.isDefined) nextElem.get else throw new IllegalStateException("No next")
nextElem = None
result
}
}
0 until n map (i => comb(s, i, n))
}
splitRoundRobin(1 to 12 toStream, 3) map (_.toList.mkString(","))
// Vector(3,6,9,12, 1,4,7,10, 2,5,8,11)
splitRoundRobin(Stream from 1, 3) map (_.take(4).mkString(","))
//> Vector(3,6,9,12, 1,4,7,10, 2,5,8,11)