3

所以,我正在做这个项目,我正在用 Chapel 计算语言写作。我已经编写了该程序,并且在非并行运行时它可以完美运行。

但是当我添加forall需要并行化的语句时,程序运行速度确实快了很多,但它没有提供我需要的结果。我知道是因为我在步骤 1、3、5 和 7 中存在竞争条件,j = j - 1;因此我尝试创建j一个同步变量以防止这种竞争条件破坏我的结果,然后我编译、运行并且我的程序永远不会使它脱离了第 1 步,这是第一个同步变量所在的位置,所以我有理由相信这是因为同步变量j.

如果有人对我应该如何并行化或同步有任何见解,以便对我的最终网格进行排序,那就太好了。这是代码:

//SlideSort.chapel_version
//

use Random;
use Time;

config const seed = 2345;
var rs = new RandomStream(seed);
var num: int;
var transferMesh:[0..36530] int;
var copyMesh:[0..36530] int;
var mesh:[0..37883] int;
var i: int;
var z: int;



   proc slideSort(){
      writeln("\nSorting..\n");
/*-------------------------Step 1-------------------------------*/
      writeln("Step 1");
      forall i in 0..36530{
         transferMesh[i] = mesh[i+677];
      }//end for

      forall i in 0..1352{
         forall a in 0..26{
            var value: int = transferMesh[1353*a+i];
            var j$: int = 1353*a+i-1;
               while j$>= 1353*a && transferMesh[j$] > value{
                  transferMesh[j$+1] = transferMesh[j$];
                  j$ = j$ - 1;
               }//end While
               transferMesh[j$+1] = value;
         }//end a_for
      }//end i_for

      forall k in 0..36530{
         mesh[k+677] = transferMesh[k];
      }//end For_k
/*--------------------------Step 2--------------------------------*/
      writeln("Step 2");
         forall k in 677..37207{
            transferMesh[k-677] = mesh[k];
         }//end k_for

         forall k in 0..1352{
            for z in 0..26{
               copyMesh[k+1353*z] = transferMesh[27*k+z];
            }//end z_for
         }//end k_for

         forall k in 0..36530{
            mesh[k+677] = copyMesh[k];
         }//end k_for
/*--------------------------Step 3--------------------------------*/
      writeln("Step 3");
         forall i in 0..36530{
            transferMesh[i] = mesh[i+677];
         }//end for

         forall i in 0..1352{
            forall a in 0..26{
               var value: int = transferMesh[1353*a+i];
               var j: int = 1353*a+i-1;
                  while j>= 1353*a && transferMesh[j] > value{
                     transferMesh[j+1] = transferMesh[j];
                     j = j - 1;
                  }//end While
                  transferMesh[j+1] = value;
            }//end a_for
         }//end i_for

         forall k in 0..36530{
            mesh[k+677] = transferMesh[k];
         }//end For_k
/*--------------------------Step 4--------------------------------*/
      writeln("Step 4");
         forall k in 677..37207{
            transferMesh[k-677] = mesh[k];
         }//end for

         forall k in 0..1352{
            forall z in 0..26{
               copyMesh[27*k+z] = transferMesh[k+1353*z];
            }//end z_for
         }//end k_for

         forall k in 0..36530{
            mesh[k+677] = copyMesh[k];
         }//end k_for
/*--------------------------Step 5--------------------------------*/
      writeln("Step 5");
         forall i in 0..36530{
            transferMesh[i] = mesh[i+677];
         }//end for

         forall i in 0..1352{
            forall a in 0..26{
               var value: int = transferMesh[1353*a+i];
               var j: int = 1353*a+i-1;
                  while j>= 1353*a && transferMesh[j] > value{
                     transferMesh[j+1] = transferMesh[j];
                     j = j - 1;
                  }//end While
                  transferMesh[j+1] = value;
            }//end a_for
         }//end i_for

         forall k in 0..36530{
            mesh[k+677] = transferMesh[k];
         }//end For_k
/*--------------------------Step 6--------------------------------*/
      writeln("Step 6");
        /*This is the padding step, so, since I already have a padded array
          I will simply just sort my padded array in step 7
        */
/*--------------------------Step 7--------------------------------*/
      writeln("Step 7\n");
         forall k in 0..1352{
            forall a in 0..27{
               var value: int = mesh[1353*a+k];
               var j: int = 1353*a+k-1;
               while j >= 1353 *a && mesh[j] > value{
                  mesh[j+1] = mesh [j];
                  j = j - 1;
               }//end while
               mesh[j+1] = value;
            }//end a_for
         }//end k_for


   }//slideSort END
/*^^^^^^^^^^^^^^^^^^^^^^^end slidesort^^^^^^^^^^^^^^^^^^^^^^^^^^^^*/

   proc isSorted() :int{
      var sorted: int = 0;
      for i in 0..37882{
         if mesh[i+1] < mesh[i]{
            writeln("NOT SORTED :(");
            sorted = 1;
            break;
         }//if
      }//end For
      return sorted;
   }// isSorted END



proc main(){

/*Padding array with -INF*/
   for i in 0..676 do mesh[i] = -1000000;

/*Filling array with random ints*/
   for i in 677..37207{
      num = (301 * rs.getNext()): int;
      mesh[i] = num;
   }//end for

/*Padding array with +INF*/
   for i in 37208..37883 do mesh[i] = 1000000;


   writeln("\n200th Element: ", mesh[199],"\n1000th Element: ", mesh[999]);
   writeln("30000th Element: ", mesh[29999], "\n37300th Element: ", mesh[37299]);

   const startTime = getCurrentTime();
   slideSort();
   const endTime = getCurrentTime();

   writeln("\n200th Element: ", mesh[199], "\n1000th Element: ", mesh[999]);
   writeln("30000th Element: ", mesh[29999], "\n37300th Element: ", mesh[37299]);

   writeln("\n\nElapsed time was: ", (endTime - startTime));

   if(isSorted()==0) then writeln("\nYep the mesh is sorted via SlideSort :)");

   write("\nWould you like to print out every 100th element of the Mesh?\n",
           "'Y' for yes\n",
           "'N' for no\n");
   var print: string = "N";
   print = stdin.read(string);

   if print == "Y" || print == "y"{
      for i in 0..37883 by 100{
         write(mesh[i],"\n");
      }//end innerFor
   }//end if

}//end MAIN
4

1 回答 1

2

它与同步变量没有太大关系。您的双循环如下所示:

forall i in 0..1352 do
    forall a in 0..26
    {
        var j = 1353*a+i;
        var v = transferMesh[j];

        while( j>= 1353*a )
        {
            if( transferMesh[j] <= v )
                break;

            transferMesh[j+1] = transferMesh[j];
            j--;
        }

        transferMesh[j+1] = v;
    }

这是数据竞争的明显来源。您进行了测试transferMesh[j],但是由于另一个具有相同 a 的 i 可以同时迭代,它可能会同时修改该值,从而导致未定义的结果。

对于这些循环中的每一个,您应该每个块只允许一个工作人员(因此每个 a 的值)。因此,您应该只在 a 上并行迭代,即forall a in 0..26 do for i in 0..1352 {...}


因此很容易获得校正后的代码:

//SlideSort.chapel_version
//

use Random;
use Time;

config const seed = 2345;
var rs = new RandomStream(seed);
var num: int;
var transferMesh:[0..36530] int;
var copyMesh:[0..36530] int;
var mesh:[0..37883] int;
var i: int;
var z: int;



proc slideSort()
{
  writeln("\nSorting..\n");
  /*-------------------------Step 1-------------------------------*/
  writeln("Step 1");
  forall i in 0..36530
  {
    transferMesh[i] = mesh[i+677];
  }//end for

  forall a in 0..26
  {
    for i in 0..1352
    {
      var value: int = transferMesh[1353*a+i];
      var j: int = 1353*a+i-1;
      while j>= 1353*a && transferMesh[j] > value
      {
        transferMesh[j+1] = transferMesh[j];
        j = j - 1;
      }//end While
      transferMesh[j+1] = value;
    }//end i_for
  }//end a_for

  forall k in 0..36530
  {
    mesh[k+677] = transferMesh[k];
  }//end For_k
  /*--------------------------Step 2--------------------------------*/
  writeln("Step 2");
  forall k in 677..37207
  {
    transferMesh[k-677] = mesh[k];
  }//end k_for

  forall z in 0..26
  {
    forall k in 0..1352
    {
      copyMesh[k+1353*z] = transferMesh[27*k+z];
    }//end k_for
  }//end z_for

  forall k in 0..36530
  {
    mesh[k+677] = copyMesh[k];
  }//end k_for
  /*--------------------------Step 3--------------------------------*/
  writeln("Step 3");
  forall i in 0..36530
  {
    transferMesh[i] = mesh[i+677];
  }//end for

  forall a in 0..26
  {
    for i in 0..1352
    {
      var value: int = transferMesh[1353*a+i];
      var j: int = 1353*a+i-1;
      while j>= 1353*a && transferMesh[j] > value
      {
        transferMesh[j+1] = transferMesh[j];
        j = j - 1;
      }//end While
      transferMesh[j+1] = value;
    }//end i_for
  }//end a_for

  forall k in 0..36530
  {
    mesh[k+677] = transferMesh[k];
  }//end For_k
  /*--------------------------Step 4--------------------------------*/
  writeln("Step 4");
  forall k in 677..37207
  {
    transferMesh[k-677] = mesh[k];
  }//end for

  forall k in 0..1352
  {
    for z in 0..26
    {
      copyMesh[27*k+z] = transferMesh[k+1353*z];
    }//end z_for
  }//end k_for

  forall k in 0..36530
  {
    mesh[k+677] = copyMesh[k];
  }//end k_for
  /*--------------------------Step 5--------------------------------*/
  writeln("Step 5");
  forall i in 0..36530
  {
    transferMesh[i] = mesh[i+677];
  }//end for

  forall a in 0..26
  {
    for i in 0..1352
    {
      var value: int = transferMesh[1353*a+i];
      var j: int = 1353*a+i-1;
      while j>= 1353*a && transferMesh[j] > value
      {
        transferMesh[j+1] = transferMesh[j];
        j = j - 1;
      }//end While
      transferMesh[j+1] = value;
    }//end i_for
  }//end a_for

  forall k in 0..36530
  {
    mesh[k+677] = transferMesh[k];
  }//end For_k
  /*--------------------------Step 6--------------------------------*/
  writeln("Step 6");
  /*This is the padding step, so, since I already have a padded array
    I will simply just sort my padded array in step 7
   */
  /*--------------------------Step 7--------------------------------*/
  writeln("Step 7\n");
  forall a in 0..27
  {
    for k in 0..1352
    {
      var value: int = mesh[1353*a+k];
      var j: int = 1353*a+k-1;
      while j >= 1353 *a && mesh[j] > value
      {
        mesh[j+1] = mesh [j];
        j = j - 1;
      }//end while
      mesh[j+1] = value;
    }//end k_for
  }//end a_for


}//slideSort END
/*^^^^^^^^^^^^^^^^^^^^^^^end slidesort^^^^^^^^^^^^^^^^^^^^^^^^^^^^*/

proc isSorted() :int
{
  var sorted: int = 0;
  for i in 0..37882
  {
    if mesh[i+1] < mesh[i]
    {
      writeln("NOT SORTED :(");
      sorted = 1;
      break;
    }//if
  }//end For
  return sorted;
}// isSorted END



proc main()
{

  /*Padding array with -INF*/
  for i in 0..676 do mesh[i] = -1000000;

  /*Filling array with random ints*/
  for i in 677..37207
  {
    num = (301 * rs.getNext()): int;
    mesh[i] = num;
  }//end for

  /*Padding array with +INF*/
  for i in 37208..37883 do mesh[i] = 1000000;


  writeln("\n200th Element: ", mesh[199],"\n1000th Element: ", mesh[999]);
  writeln("30000th Element: ", mesh[29999], "\n37300th Element: ", mesh[37299]);

  const startTime = getCurrentTime();
  slideSort();
  const endTime = getCurrentTime();

  writeln("\n200th Element: ", mesh[199], "\n1000th Element: ", mesh[999]);
  writeln("30000th Element: ", mesh[29999], "\n37300th Element: ", mesh[37299]);

  writeln("\n\nElapsed time was: ", (endTime - startTime));

  if(isSorted()==0) then writeln("\nYep the mesh is sorted via SlideSort :)");

}//end MAIN

结果 :

$ chpl sort.chpl ; ./a.out | tail -n 1
Yep the mesh is sorted via SlideSort :)
于 2014-12-05T17:18:59.507 回答