合并排序例程中有以下代码段
val m = xs.length / 2
val (l, r) = xs.splitAt(m)
streamMerge(streamSort(l), streamSort(r))
有没有更实用(和懒惰)的方式将一个流分成两个?我尝试从这里移植拆分例程http://en.literateprograms.org/Merge_sort_(Haskell)但它会导致堆栈溢出崩溃。
合并排序例程中有以下代码段
val m = xs.length / 2
val (l, r) = xs.splitAt(m)
streamMerge(streamSort(l), streamSort(r))
有没有更实用(和懒惰)的方式将一个流分成两个?我尝试从这里移植拆分例程http://en.literateprograms.org/Merge_sort_(Haskell)但它会导致堆栈溢出崩溃。
我看到了两种可能性:不使用长度或不使用流。
长度是流的严格函数,所以你不能这样做。但是有多种非严格的可能性:
从流中获取前三个元素。当少于三个时,首先拆分没有任何意义。然后拆分大于这三个的第一个元素,使用Stream.partition(_ > biggestElement)
.
这通常会很好地工作,但在特殊数据星座(例如已经排序的输入)上会出现问题。
均匀地分割流,但不要在中间。用于Stream.zipWithIndex.partition(_._2 %2 == 0)
获取两个流。
如果您通过网络将排序的某些部分卸载到其他节点,这可能是一个好方法。
当您根本不使用流时,您的算法可能会运行得更快,而是使用便宜的数据结构来获取大小。
如果你使用可变集合,你甚至可以就地排序。当您在本地拥有所有数据(例如在 RAM 中或在内存映射文件中)时,这应该是最快的方法。
如果您真的想split
在参考中实现,则必须进行go
尾递归。
正常实现(或多或少复制):
def go[A](v: (Stream[A], Stream[A])): (Stream[A], Stream[A]) = v match {
case (x #:: xs, _ #:: _ #:: zs) =>
val (us,vs) = go((xs,zs))
(x #:: us, vs)
case (xs, _) => (Stream.empty, xs)
}
这确实会溢出堆栈。
现在我们让它尾递归:
def go[A](v: (Stream[A], Stream[A]), acc: Stream[A]): (Stream[A], Stream[A]) = v match {
case (x #:: xs, _ #:: _ #:: zs) =>
go((xs,zs), x #:: acc)
case (xs, _) => (acc.reverse, xs)
}
现在调用:
go((x,x), Stream.empty)
你会得到一个没有堆栈溢出的惰性拆分(在测试时我首先填满了我的内存)。
正如我的评论所提到的,最后一个解决方案不适用于无限流。这种情况下的问题是结果的右侧:为了知道结果流(它只是原始流的尾部),我们必须全面评估原始流。
允许无限流的实现使这一点显而易见:
def split[A](x: Stream[A]) = {
def goL(v: (Stream[A], Stream[A])): Stream[A] = v match {
case (x #:: xs, _ #:: _ #:: zs) =>
x #:: goL(xs, zs)
case (xs, _) => Stream.empty
}
def goR(v: (Stream[A], Stream[A])): Stream[A] = v match {
case (x #:: xs, _ #:: _ #:: zs) => goR(xs, zs)
case (xs, _) => xs
}
val tup = (x,x)
(goL(tup), () => goR(tup))
}
您可以看到左侧和右侧之间的根本区别:
goL
调用被编译器包装在一个闭包中(蹦床模式的“隐藏”版本)goR
被手动包裹在一个闭包中,否则调用goR
不会终止。除了右流的闭包外,这很好用。这可以通过提供围绕流的包装器/视图来缓解,该包装器/视图仅在使用时评估底层流(即Stream
对象本身)。
上面的代码可以按如下方式使用:
val (a,b) = split(Stream.continually(1))
println(a.head) // > 1
val (c,d) = split(Stream.fill(1000000)(1))
println(c.size) // > 500000
println(d().size) // > 500000
您已经以正确的方式进行操作。懒惰地尝试合并排序是没有意义的。当你调用时你已经强制你的整个流xs.length
,所以尝试使用惰性方法来拆分它不会有什么不同。
你可以做的是让streamMerge
函数变得懒惰。当您将排序的子列表合并在一起时,您只需要知道两个流中每个流的第一个元素,因此您可以在合并流时懒惰地确定哪个元素最小。这就是我的想法:
def streamMerge[T](xs: Stream[T], ys: Stream[T])(implicit ord: math.Ordering[T]): Stream[T] = {
if (xs.isEmpty) ys
else if (ys.isEmpty) xs
else {
if (ord.lteq(xs.head, ys.head))
xs.head #:: streamMerge(xs.tail, ys)
else
ys.head #:: streamMerge(xs, ys.tail)
}
}
def streamSort[T](xs: Stream[T])(implicit ord: math.Ordering[T]): Stream[T] = xs match {
case Stream.Empty => xs
case Stream(_) => xs
case _ => {
val m = xs.length / 2
val (l, r) = xs.splitAt(m)
streamMerge(streamSort(l), streamSort(r))
}
}