8

这是我在 Scala 中的合并排序实现:

object FuncSort {
  def merge(l: Stream[Int], r: Stream[Int]) : Stream[Int] = {
    (l, r) match {
      case (h #:: t, Empty) => l
      case (Empty, h #:: t) => r
      case (x #:: xs, y #:: ys) => if(x < y ) x #:: merge(xs, r) else y #:: merge(l, ys)
    }
  }

  def sort(xs: Stream[Int]) : Stream[Int] = {
    if(xs.length == 1) xs
    else {
      val m = xs.length / 2
      val (l, r) = xs.splitAt(m)
      merge(sort(l), sort(r))
    }
  }
}

它工作正常,似乎渐近它也很好,但它比这里的 Java 实现慢(大约 10 倍)http://algs4.cs.princeton.edu/22mergesort/Merge.java.html并使用很多内存。是否有更快的合并排序实现功能?显然,可以逐行移植 Java 版本,但这不是我想要的。

UPD:我更改Stream了 toList#::to ::,排序例程变得更快,仅比 Java 版本慢三到四倍。但我不明白为什么它不会因堆栈溢出而崩溃?merge不是尾递归的,所有参数都经过严格评估……这怎么可能?

4

3 回答 3

3

你提出了多个问题。我尝试按逻辑顺序回答它们:

Stream 版本中没有堆栈溢出

你并没有真正问这个问题,但它导致了一些有趣的观察。

#:: merge(...)在您在函数内部使用的 Stream 版本中merge。通常这将是一个递归调用,并且对于足够大的输入数据可能会导致堆栈溢出。但在这种情况下不是。运算符#::(a,b)class ConsWrapper[A](存在隐式转换)中实现并且是 . 的同义词cons.apply[A](hd: A, tl: ⇒ Stream[A]): Cons[A]。如您所见,第二个参数是按名称调用的,这意味着它是惰性求值的。

这意味着merge返回一个新创建的类型对象,cons该对象最终将再次调用合并。换句话说:递归不会发生在堆栈上,而是发生在堆上。通常你有很多堆。

使用堆进行递归是一种处理非常深递归的好技术。但它比使用堆栈要慢得多。所以你用速度换取了递归深度。Stream这就是为什么使用这么慢的主要原因。

第二个原因是,为了获得 的长度Stream,Scala 必须将整个Stream. 但是在排序过程中Stream,无论如何它都必须具体化每个元素,所以这不会造成太大的伤害。

List 版本没有堆栈溢出

当您将 Stream 更改为 List 时,您确实在使用堆栈进行递归。现在可能会发生堆栈溢出。但是对于排序,你通常有一个递归深度log(size),通常是 base 的对数2。因此,要对 40 亿个输入项进行排序,您将需要大约 32 个堆栈帧。默认堆栈大小至少为 320k(在 Windows 上,其他系统具有更大的默认值),这为大量递归留下了空间,因此为大量输入数据进行了排序。

更快的功能实现

这取决于 :-)

您应该使用堆栈而不是堆进行递归。你应该根据输入数据来决定你的策略:

  1. 对于小数据块,使用一些简单的算法对它们进行适当的排序。算法复杂性不会咬你,你可以从缓存中的所有数据中获得很多性能。当然,你仍然可以为给定的大小手动编码排序网络
  2. 如果您有数字输入数据,则可以使用基数排序并将工作处理到处理器或 GPU 上的矢量单元(更复杂的算法可以在GPU Gems中找到)。
  3. 对于中等大小的数据块,您可以使用分而治之的策略将数据拆分到多个线程(仅当您有多个内核时!)
  4. 对于巨大的数据块,使用合并排序并将其拆分为适合内存的块。如果需要,您可以将这些块分布在网络上并在内存中排序。

不要使用交换并使用您的缓存。如果可以的话,使用可变数据结构并就地排序。我认为功能排序和快速排序不能很好地结合在一起。要使排序真正快速,您将不得不使用有状态操作(例如,可变数组上的就地合并排序)。

我通常在我的所有程序上尝试这个:尽可能使用纯函数样式,但在可行的情况下对小部分使用有状态操作(例如,因为它具有更好的性能,或者代码只需要处理很多状态并且在我使用vars 而不是vals)。

于 2013-09-26T07:49:57.397 回答
2

这里有几点需要注意。

首先,您没有正确考虑要排序的初始流为空的情况。您可以通过将 sort 内部的初始检查修改为 read 来解决此问题if(xs.length <= 1) xs

其次,流可能具有无法计算的长度(例如Strem.from(1)),这在尝试计算该长度的一半(可能是无限的)时会造成问题 - 您可能需要考虑检查 usinghasDefiniteSize或类似的(尽管天真地使用这可能会过滤掉一些其他可计算的流)。

最后,它被定义为对流进行操作这一事实可能会减慢它的速度。我尝试对合并排序的流版本与写入进程列表的版本的大量运行进行计时,并且列表版本的运行速度大约快 3 倍(诚然,仅在一对运行中)。这表明以这种方式使用流的效率低于列表或其他序列类型(Vector 可能更快,或者根据引用的 Java 解决方案使用数组)。

也就是说,我不是时间和效率方面的专家,所以其他人可能能够给出更博学的回应。

于 2013-09-22T14:28:45.013 回答
1

您的实现是自上而下的合并排序。我发现自下而上的合并排序更快,并且与List.sorted(对于我的测试用例,随机大小的随机数列表)相当。

def bottomUpMergeSort[A](la: List[A])(implicit ord: Ordering[A]): List[A] = {
  val l = la.length

  @scala.annotation.tailrec
  def merge(l: List[A], r: List[A], acc: List[A] = Nil): List[A] = (l, r) match {
    case (Nil, Nil)           => acc
    case (Nil, h :: t)        => merge(Nil, t, h :: acc)
    case (h :: t, Nil)        => merge(t, Nil, h :: acc)
    case (lh :: lt, rh :: rt) =>
      if(ord.lt(lh, rh)) merge(lt, r, lh :: acc)
      else               merge(l, rt, rh :: acc)
  }

  @scala.annotation.tailrec
  def process(la: List[A], h: Int, acc: List[A] = Nil): List[A] = {
    if(la == Nil) acc.reverse
    else {
      val (l1, r1) = la.splitAt(h)
      val (l2, r2) = r1.splitAt(h)

      process(r2, h, merge(l1, l2, acc))
    }
  }

  @scala.annotation.tailrec
  def run(la: List[A], h: Int): List[A] =
    if(h >= l) la
    else       run(process(la, h), h * 2)

  run(la, 1)
}
于 2013-09-26T11:13:54.740 回答