1

合并排序例程中有以下代码段

val m = xs.length / 2
val (l, r) = xs.splitAt(m)
streamMerge(streamSort(l), streamSort(r))

有没有更实用(和懒惰)的方式将一个流分成两个?我尝试从这里移植拆分例程http://en.literateprograms.org/Merge_sort_(Haskell)但它会导致堆栈溢出崩溃。

4

3 回答 3

3

我看到了两种可能性:不使用长度或不使用流。

不要使用长度

长度是流的严格函数,所以你不能这样做。但是有多种非严格的可能性:

  1. 使用概率方法
  2. 使用每个第 n 个元素

概率方法

从流中获取前三个元素。当少于三个时,首先拆分没有任何意义。然后拆分大于这三个的第一个元素,使用Stream.partition(_ > biggestElement).

这通常会很好地工作,但在特殊数据星座(例如已经排序的输入)上会出现问题。

使用每个第 n 个元素

均匀地分割流,但不要在中间。用于Stream.zipWithIndex.partition(_._2 %2 == 0)获取两个流。

如果您通过网络将排序的某些部分卸载到其他节点,这可能是一个好方法。

不要使用流

当您根本不使用流时,您的算法可能会运行得更快,而是使用便宜的数据结构来获取大小。

如果你使用可变集合,你甚至可以就地排序。当您在本地拥有所有数据(例如在 RAM 中或在内存映射文件中)时,这应该是最快的方法。

于 2013-09-25T14:51:02.490 回答
2

如果您真的想split在参考中实现,则必须进行go尾递归。

正常实现(或多或少复制):

def go[A](v: (Stream[A], Stream[A])): (Stream[A], Stream[A]) = v match {
  case (x #:: xs, _ #:: _ #:: zs) =>
    val (us,vs) = go((xs,zs))
    (x #:: us, vs)
  case (xs, _) => (Stream.empty, xs)
}

这确实会溢出堆栈。

现在我们让它尾递归:

def go[A](v: (Stream[A], Stream[A]), acc: Stream[A]): (Stream[A], Stream[A]) = v match {
  case (x #:: xs, _ #:: _ #:: zs) =>
    go((xs,zs), x #:: acc)
  case (xs, _) => (acc.reverse, xs)
}

现在调用:

go((x,x), Stream.empty)

你会得到一个没有堆栈溢出的惰性拆分(在测试时我首先填满了我的内存)。

更新

正如我的评论所提到的,最后一个解决方案不适用于无限流。这种情况下的问题是结果的右侧:为了知道结果流(它只是原始流的尾部),我们必须全面评估原始流。

允许无限流的实现使这一点显而易见:

def split[A](x: Stream[A]) = {

  def goL(v: (Stream[A], Stream[A])): Stream[A] = v match {
    case (x #:: xs, _ #:: _ #:: zs) =>
      x #:: goL(xs, zs)
    case (xs, _) => Stream.empty
  }

  def goR(v: (Stream[A], Stream[A])): Stream[A] = v match {
    case (x #:: xs, _ #:: _ #:: zs) => goR(xs, zs)
    case (xs, _) => xs
  }

  val tup = (x,x)
  (goL(tup), () => goR(tup))

}

您可以看到左侧和右侧之间的根本区别:

  • 左侧不是尾递归,但不会溢出堆栈,因为递归goL调用被编译器包装在一个闭包中(蹦床模式的“隐藏”版本)
  • 调用goR被手动包裹在一个闭包中,否则调用goR不会终止。

除了右流的闭包外,这很好用。这可以通过提供围绕流的包装器/视图来缓解,该包装器/视图仅在使用时评估底层流(即Stream对象本身)。

上面的代码可以按如下方式使用:

val (a,b) = split(Stream.continually(1))
println(a.head) // > 1

val (c,d) = split(Stream.fill(1000000)(1))

println(c.size)    // > 500000
println(d().size)  // > 500000
于 2013-09-25T13:30:40.813 回答
1

您已经以正确的方式进行操作。懒惰地尝试合并排序是没有意义的。当你调用时你已经强制你的整个流xs.length,所以尝试使用惰性方法来拆分它不会有什么不同。

可以做的是让streamMerge函数变得懒惰。当您将排序的子列表合并在一起时,您只需要知道两个流中每个流的第一个元素,因此您可以在合并流时懒惰地确定哪个元素最小。这就是我的想法:

def streamMerge[T](xs: Stream[T], ys: Stream[T])(implicit ord: math.Ordering[T]): Stream[T] = {
  if (xs.isEmpty) ys
  else if (ys.isEmpty) xs
  else {
    if (ord.lteq(xs.head, ys.head))
      xs.head #:: streamMerge(xs.tail, ys)
    else 
      ys.head #:: streamMerge(xs, ys.tail)
  }
}

def streamSort[T](xs: Stream[T])(implicit ord: math.Ordering[T]): Stream[T] = xs match {
  case Stream.Empty => xs
  case Stream(_) => xs
  case _ => {
    val m = xs.length / 2
    val (l, r) = xs.splitAt(m)
    streamMerge(streamSort(l), streamSort(r))
  }
}
于 2013-09-25T06:55:28.600 回答