6

是否有任何算法可以使链表的并行排序值得?

众所周知,合并排序是用于对链表进行排序的最佳算法。

大多数归并排序都是用数组来解释的,每一半都是递归排序的。这将使并行化变得微不足道:独立排序每一半,然后合并两半。

但是链表没有“中途”点;一个链表一直到它结束:

头 → [a] → [b] → [c] → [d] → [e] → [f] → [g] → [h] → [i] → [j] → ...

我现在的实现遍历列表一次以获取计数,然后递归拆分计数,直到我们将一个节点与其进行比较NextNode。递归负责记住两半的位置。

这意味着链表的 MergeSort 在列表中线性进行。由于它似乎要求通过列表线性发展,我认为它不能并行化。我能想象的唯一方法是:

  • 遍历列表以计数 O(n)
  • 走一半的列表到达中点O(n/2)
  • 然后对每一半进行排序O(n log n)

但即使我们在单独的线程中并行化排序(a,b)和(c,d),我认为NextNode重新排序期间的错误共享会破坏并行化的任何优点。

是否有任何用于对链表进行排序的并行算法?

数组归并排序算法

以下是对数组执行合并排序的标准算法:

algorithm Merge-Sort
    input:
        an array, A (the values to be sorted)
        an integer, p (the lower bound of the values to be sorted)
        an integer, r (the upper bound of the values to be sorted)

    define variables:
        an integer, q (the midpoint of the values to be sorted)

    q ← ⌊(p+r)/2⌋
    Merge-Sort(A, p, q)   //sort the lower half
    Merge-Sort(A, q+1, r) //sort the upper half   
    Merge(A, p, q, r)     

该算法是为具有任意索引访问的数组而设计的。为了使其适用于链表,必须对其进行修改。

链表合并排序算法

这是(单线程)单链表,合并排序,我目前用来对单链表进行排序的算法。它来自Gonnet + Baeza Yates 算法手册

algorithm sort:
    input:
        a reference to a list, r (pointer to the first item in the linked list)
        an integer, n (the number of items to be sorted)
    output:
        a reference to a list (pointer to the sorted list)

    define variables:
        a reference to a list, A (pointer to the sorted top half of the list)
        a reference to a list, B (pointer to the sorted bottom half of the list)
        a reference to a list, temp (temporary variable used to swap)

    if r = nil then
        return nil

    if n > 1 then
        A ← sort(r, ⌊n/2⌋ )
        B ← sort(r, ⌊(n+1)/2⌋ )
        return merge( A, B )

    temp ← r
    r ← r.next
    temp.next ← nil
    return temp

Pascal 实现将是:

function MergeSort(var r: list; n: integer): list;
begin
   if r = nil then 
       Result := nil
   else if n > 1 then
      Result := Merge(MergeSort(r, n div 2), MergeSort(r, (n+1) div 2) )
   else
   begin
      Result := r;
      r := r.next;
      Result.next := nil;
   end
end;

如果我的转码有效,这里有一个即时 C# 翻译:

list function MergeSort(ref list r, Int32 n)
{
   if (r == null)
      return null;

    if (n > 1)
    {
       list A = MergeSort(r, n / 2);
       list B = MergeSort(r, (n+1) / 2);
       return Merge(A, B);
    }
    else
    {
       list temp = r;
       r = r.next;
       temp.next = null;
       return temp;
    }
}

我现在需要的是一种并行算法来对链表进行排序。它不一定是归并排序。

有些人建议复制接下来的n项,其中 n 项适合单个缓存行,并使用这些生成一个任务。

样本数据

algorithm GenerateSampleData
    input:
        an integer, n (the number of items to generate in the linked list)
    output:
        a reference to a node (the head of the linked list of random data to be sorted)

    define variables:
        a reference to a node, head (the returned head)
        a reference to a node, item (an item in the linked list)
        an integer, i (a counter)

    head ← new node
    item ← head        

    for i ← 1 to n do
        item.value ← Random()
        item.next ← new node
        item ← item.next

    return head

所以我们可以通过调用生成一个包含 300,000 个随机项的列表:

head := GenerateSampleData(300000);

基准

Time to generate 300,000 items    568 ms

MergeSort 
    count splitting variation   3,888 ms (baseline)

MergeSort
    Slow-Fast midpoint finding  3,920 ms (0.8% slower)

QuickSort
    Copy linked list to array       4 ms
    Quicksort array             5,609 ms
    Relink list                     5 ms
    Total                       5,625 ms (44% slower)

奖金阅读

4

5 回答 5

5

Mergesort 非常适合并行排序。将列表分成两半并对每一部分进行并行排序,然后合并结果。如果您需要两个以上的并行排序过程,请递归执行。如果你碰巧没有无限多的 CPU,你可以在一定的递归深度(你必须通过测试来确定)省略并行化。

顺便说一句,将列表分成大小大致相同的两半的常用方法是弗洛伊德的循环查找算法,也称为兔子和乌龟方法:

Node MergeSort(Node head)
{
   if ((head == null) || (head.Next == null))
      return head; //Oops, don't return null; what if only head.Next was null

   Node firstHalf = head;
   Node middle = GetMiddle(head);
   Node secondHalf = middle.Next;
   middle.Next = null; //cut the two halves

   //Sort the lower and upper halves
   firstHalf = MergeSort(firstHalf);
   secondHalf = MergeSort(secondHalf);

   //Merge the sorted halves 
   return Merge(firstHalf, secondHalf);
}

Node GetMiddle(Node head)
{
   if (head == null || head.Next == null)
       return null;

   Node slow = head;
   Node fast = head;
   while ((fast.Next != null) && (fast.Next.Next != null))
   {
       slow = slow.Next;
       fast = fast.Next.Next;
   }
   return slow;
}

之后,listlist2是两个大小大致相同的列表。连接它们将产生原始列表。当然,fast = fast->next->next需要进一步关注;这只是为了展示一般原则。

在此处输入图像描述

于 2013-11-02T15:30:50.043 回答
0

合并排序是一种分而治之的算法。

数组擅长在中间划分。

链表在中间划分是低效的,所以让我们在遍历列表的同时划分它们。

Take the first  element and put it in list 1.
Take the second element and put it in list 2.
Take the third  element and put it in list 1.
...

您现在已经有效地将列表分成两半,并且通过自下而上的合并排序,您可以在您仍在遍历第一个列表时开始合并步骤,将其分为奇数和偶数。

于 2013-11-02T06:16:47.613 回答
0

您可以通过两种方式进行合并排序。首先,您将列表分成两半,然后对这两个部分递归地应用合并排序并合并结果。但是还有另一种方法。您可以将列表拆分成对,然后递归地合并对,直到得到单个列表,即结果。请参阅 ghc haskell 中 Data.List.sort 的示例实现。该算法可以通过在开始时为一些适当数量的对生成进程(或线程)来并行化,然后也可以合并它们的结果直到有一个。

于 2013-11-02T19:45:39.610 回答
0

我想包含一个实际处理并行工作的版本(使用本机 Windows 线程池)。

您不想将工作一直放在分割递归树的线程中。您只想安排与 CPU 一样多的工作。这意味着您必须知道有多少(逻辑)CPU。例如,如果您有 8 个内核,那么第一个

  • 初始调用:1 个线程
  • 第一次递归:变成2个线程
  • 第二次递归:变成4个线程
  • 第三次递归:变成8个线程
  • 第四次递归:在不拆分更多线程的情况下执行工作

通过查询系统中的处理器数量来处理这个问题:

Int32 GetNumberOfProcessors()
{
   SYSTEM_INFO systemInfo;
   GetSystemInfo(ref systemInfo);
   return systemInfo.dwNumberOfProcessors;
}

然后我们改变递归MergeSort函数来支持一个numberOfProcessors参数:

public Node MergeSort(Node head)
{
    return MergeSort(head, GetNumberOfProcessors());
}

每次我们递归时,我们将numberOfProcessors/2. 当递归函数停止看到至少两个可用处理器时,它停止将工作放入线程池,并在同一个线程上计算它。

Node MergeSort(Node head, Int32 numberOfProcessors)
{
   if ((head == null) || (head.Next == null))
      return head;

   Node firstHalf = head;
   Node middle = GetMiddle(head);
   Node secondHalf = middle.Next;
   middle.Next = null;

   //Sort the lower and upper halves
   if (numberOfProcessors >= 2)
   {
      //Throw the work into the thread pool, since we have CPUs left
      MergeSortOnTheadPool(ref firstHalf, ref secondHalf, numberOfProcessors / 2);

      //i only split this into a separate function to keep 
      //the code short and easily readable
   } 
   else
   {
      firstHalf = MergeSort(firstHalf, numberOfProcessors);
      secondHalf = MergeSort(secondHalf, numberOfProcessors);

   }
   //Merge the sorted halves 
   return Merge(firstHalf, secondHalf);
}

可以使用您最喜欢的语言可用机制来完成这项并行工作。由于我实际使用的语言(不是编写此代码的 C#)不支持asynnc-awaitTask Parallel Library或任何其他语言集成并行系统,因此我们采用老式方式:Event with Interlocked操作。这是我曾经在 AMD 白皮书中读到的一种技术;完成他们的技巧来消除微妙的竞争条件:

void MergeSortOnThreadPool(ref Node listA, ref Node listB)
{
   Int32 nActiveThreads = 1; //Yes 1, to stop a race condition
   using (Event doneEvent = new Event())
   {
      //Put everything the thread will need into a holder object
      ContextInfo contextA = new Context();
      contextA.DoneEvent = doneEvent;
      contextA.ActiveThreads = AddressOf(nActiveThreads);
      contextA.List = firstHalf;
      contextA.NumberOfProcessors = numberOfProcessors/2;
      InterlockedIncrement(nActiveThreads);
      QueueUserWorkItem(MergeSortThreadProc, contextA);

      //Put everything the second thead will need into another holder object
      ContextInfo contextB = new Context();
      contextB.DoneEvent = doneEvent;
      contextB.ActiveThreads = AddressOf(nActiveThreads);
      contextB.List = firstHalf;
      contextB.NumberOfProcessors = numberOfProcessors/2;
      InterlockedIncrement(nActiveThreads);
      QueueUserWorkItem(MergeSortThreadProc, contextB);

      //wait for the threads to finish
      Int32 altDone = InterlockedDecrement(nThreads);
      if (altDone > 0) then
         doneEvent.WaitFor(INFINITE);
   }

   listA = contextA.Result; //returned to the caller as ref parameters
   listB = contextB.Result; 
}

线程池线程过程也必须做一些内务处理;观察它是否是最后一个线程,并将事件设置为出路:

void MergeSortThreadProc(Pointer Data);
{
    Context context = Context(Data);

    Node sorted = MergeSort(context.List, context.ProcessorsRemaining);
    context.Result = sorted;

    //Now do lifetime management
    Int32 altDone = InterlockedDecrement(context.ActiveCount^);
    if (altDone <= 0)
       context.DoneEvent.SetEvent;
}

注意:任何代码都会发布到公共领域。无需归属。

于 2013-11-02T22:51:54.547 回答
0

对于低效的解决方案,请使用快速排序算法:列表中的第一个元素用作枢轴,将未排序的列表分成三个(这使用 O(n) 时间)。然后,您在单独的线程中递归地对较低和较高的子列表进行排序。结果是通过将下部子列表与等于枢轴的键的子列表连接起来,然后在 O(1) 额外步骤中连接上部子列表(而不是慢速合并)。

于 2015-02-13T10:25:30.853 回答