0

我有两个线程,它们使用两个不同的功能。第一个从头到尾搜索,第二个从头到尾搜索。

现在我正在使用Thread.Sleep(10)同步,但它需要太多时间,并且在这种情况下无法进行测试。

知道如何同步具有不同功能的两个线程吗?

4

3 回答 3

5

这取决于你想做什么。

  • 如果您有两个线程,并且只想在另一个达到“成功”时退出一个(或n 个线程,并且您想在一个第一个达到“成功”时退出它们),您只需要定期检查每个线程是否成功。
    • 用于Interlocked在没有锁或其他一些机制的情况下执行此操作(见下文)
    • 使用可取消的Task对象
  • 如果您需要分阶段进行搜索,其中每个线程执行某些操作,然后等待另一个线程赶上,您需要一种不同的方法。
    • 采用Barrier

鉴于您正在进行 A* 搜索,您可能需要将所有两个/三个组合起来:

  • Barrier协调步骤并更新步骤之间的开放集
  • 如果另一个线程成功,成功发出信号以计算何时退出线程
  • TaskCancellationToken允许调用者取消搜索的对象。

建议的另一个答案Semaphore- 这并不适合您的需求(请参阅下面的评论)。

Barrier可用于这样的搜索:

  • 进入算法的第 0 步
  • n个线程将当前级别分成相等的部分并在每一半上工作,当每个完成时,它会发出信号并等待另一个线程
  • 当所有线程都准备好后,继续下一步并重复搜索

简单检查退出 -Interlocked

第一部分是检查成功。如果你想保持“无锁”,你可以使用Interlocked来做到这一点,一般模式是:

// global success indicator
private const int NotDone = 0;
private const int AllDone = 1;
private int _allDone = NotDone;

private GeneralSearchFunction(bool directionForward) {
  bool iFoundIt = false;
  ... do some search operations that won't take much time
  if (iFoundIt) {
    // set _allDone to AllDone!
    Interlocked.Exchange(ref _allDone, AllDone);
    return;
  }
  ... do more work
  // after one or a few iterations, if this thread is still going
  //   see if another thread has set _allDone to AllDone
  if (Interlocked.CompareExchange(ref _allDone, NotDone, NotDone) == AllDone) {
    return; // if they did, then exit
  }
  ... loop to the top and carry on working
}

// main thread:
  Thread t1 = new Thread(() => GeneralSearchFunction(true));
  Thread t2 = new Thread(() => GeneralSearchFunction(false));
  t1.Start(); t2.Start(); // start both
  t1.Join(); t2.Join(); 
  // when this gets to here, one of them will have succeeded

这是任何类型的成功或取消令牌的一般模式:

  • 做一些工作
  • 如果你成功了,设置一个信号每隔一个线程定期检查
  • 如果你还没有成功,那么在工作的中间,无论是每次迭代,还是每隔几次迭代,检查这个线程是否应该退出

所以一个实现看起来像:

class Program
{
    // global success indicator
    private const int NotDone = 0;
    private const int AllDone = 1;
    private static int _allDone = NotDone;

    private static int _forwardsCount = 0;    // counters to simulate a "find"
    private static int _backwardsCount = 0;   // counters to simulate a "find"
    static void Main(string[] args) {
        var searchItem = "foo";
        Thread t1 = new Thread(() => DoSearchWithBarrier(SearchForwards, searchItem));
        Thread t2 = new Thread(() => DoSearchWithBarrier(SearchBackwards, searchItem));
        t1.Start(); t2.Start();
        t1.Join(); t2.Join();
        Console.WriteLine("all done");
    }
    private static void DoSearchWithBarrier(Func<string, bool> searchMethod, string searchItem) {
        while (!searchMethod(searchItem)) {
            // after one or a few iterations, if this thread is still going
            //   see if another thread has set _allDone to AllDone
            if (Interlocked.CompareExchange(ref _allDone, NotDone, NotDone) == AllDone) {
                return; // if they did, then exit
            }
        }
        Interlocked.Exchange(ref _allDone, AllDone);
    }
    public static bool SearchForwards(string item) {
        //  return true if we "found it", false if not
        return (Interlocked.Increment(ref _forwardsCount) == 10);
    }
    public static bool SearchBackwards(string item) {
        //  return true if we "found it", false if not
        return (Interlocked.Increment(ref _backwardsCount) == 20); // make this less than 10 to find it backwards first
    }
}

使用任务达到同样的目的

当然,如果不使用,这将不是 .NET 4.5 Task

class Program
{
    private static int _forwardsCount = 0;    // counters to simulate a "find"
    private static int _backwardsCount = 0;   // counters to simulate a "find"
    static void Main(string[] args) {
        var searchItem = "foo";
        var tokenSource = new CancellationTokenSource();
        var allDone = tokenSource.Token;
        Task t1 = Task.Factory.StartNew(() => DoSearchWithBarrier(SearchForwards, searchItem, tokenSource, allDone), allDone);
        Task t2 = Task.Factory.StartNew(() => DoSearchWithBarrier(SearchBackwards, searchItem, tokenSource, allDone), allDone);
        Task.WaitAll(new[] {t2, t2});
        Console.WriteLine("all done");
    }
    private static void DoSearchWithBarrier(Func<string, bool> searchMethod, string searchItem, CancellationTokenSource tokenSource, CancellationToken allDone) {
        while (!searchMethod(searchItem)) {
            if (allDone.IsCancellationRequested) {
                return;
            }
        }
        tokenSource.Cancel();
    }
    ...
}

但是,现在您已经将CancellationToken用于错误的事情 - 实际上这应该保留给搜索的调用者以取消搜索,因此您应该使用CancellationToken来检查请求的取消(只有调用者需要tokenSource),并获得不同的成功同步(如Interlocked上面的示例)退出。

相位/步同步

由于许多原因,这变得更加困难,但有一个简单的方法。将Barrier(.NET 4 的新功能)与退出信号结合使用,您可以:

  1. 为当前步骤执行分配线程的工作,然后等待另一个线程赶上,然后再进行另一次迭代
  2. 当一个成功时退出两个线程

线程同步有许多不同的方法,具体取决于您想要实现的目标。有些是:

  • Barrier:如果您打算同时运行向前和向后搜索,这可能是最合适的。它也大声说出你的意图,即“所有线程都无法继续,直到他们每个人都达到障碍”
  • ManualResetEvent- 当一个线程释放一个信号时,所有其他线程都可以继续,直到它再次被设置。AutoResetEvent类似,只是它只允许一个线程在再次阻塞之前继续。
  • Interlocked- 结合SpinWait这是一个可行的无锁解决方案
  • Semaphore- 可以使用,但不太适合您的场景

我在Barrier这里只提供了一个完整的样本,因为它似乎最适合你的情况。Barrier是性能最高的之一,仅次于ManualResetEventSlim( ref.albahari ),但使用ManualResetEvent将需要更复杂的代码。

要查看的其他技术,如果上述方法都不适合您,Monitor.Wait并且Monitor.Pulse(现在您正在使用锁定)和任务继续。后者更用于将数据从一个异步操作传递到另一个,但它可以用于您的场景。而且,与答案顶部的样本一样,您更有可能TaskBarrier使用一个而不是使用另一个相结合。Task Continuations 可用于对 A* 搜索中的开放集进行后步骤修订,但Barrier无论如何您都可以轻松地使用它。

这段代码,使用Barrier作品。从本质上讲,DoSearchWithBarrier是唯一进行同步的位 - 其余的都是设置和拆卸代码。

class Program {
    ...
    private static int _forwardsCount = 0;    // counters to simulate a "find"
    private static int _backwardsCount = 0;   // counters to simulate a "find"
    static void Main(string[] args) {
        Barrier barrier = new Barrier(numThreads, 
            b => Console.WriteLine("Completed search iteration {0}", b.CurrentPhaseNumber));
        var searchItem = "foo";
        Thread t1 = new Thread(() => DoSearchWithBarrier(SearchForwards, searchItem, barrier));
        Thread t2 = new Thread(() => DoSearchWithBarrier(SearchBackwards, searchItem, barrier));
        t1.Start(); Console.WriteLine("Started t1");
        t2.Start(); Console.WriteLine("Started t2");
        t1.Join(); Console.WriteLine("t1 done");
        t2.Join(); Console.WriteLine("t2 done");
        Console.WriteLine("all done");
    }
    private static void DoSearchWithBarrier(Func<string, bool> searchMethod, string searchItem, Barrier barrier) {
        while (!searchMethod(searchItem)) {
            // while we haven't found it, wait for the other thread to catch up
            barrier.SignalAndWait(); // check for the other thread AFTER the barrier
            if (Interlocked.CompareExchange(ref _allDone, NotDone, NotDone) == AllDone) {
                return;
            }
        }
        // set success signal on this thread BEFORE the barrier
        Interlocked.Exchange(ref _allDone, AllDone);
        // wait for the other thread, and then exit (and it will too)
        barrier.SignalAndWait();
    }
    ...
}

这里有两件事:

  • Barrier用于同步两个线程,因此在另一个线程赶上之前它们无法执行下一步
  • Interlocked正如我首先描述的那样,退出信号使用。

为 A* 搜索实现此功能与上述示例非常相似。一旦所有线程都到达障碍并因此继续,您可以使用 ManualResetEvent 或简单lock的然后让一个(并且只有一个)修改开放集。

关于Semaphore

这可能不是您想要的,因为当您拥有有限的资源池时最常使用它,需要访问的资源用户多于您拥有的资源。

想想在工作食堂角落里有 CoD 的 PlayStation - 4 个控制器,20 个人等待(WaitOne)使用它,一旦你的角色死了,你Release就是控制器,其他人取代你的位置。没有强制执行特定的 FIFO/LIFO 排序,实际上Release可以由您使用的保镖调用以防止不可避免的战斗(即不强制执行线程标识)。

简单检查退出 - 其他方法

用于lock简单的成功指示

您可以通过锁定实现相同的目的。两者Interlockedlock确保您在读取线程之间的公共变量时看不到任何内存缓存问题:

private readonly object _syncAllDone = new object();
...
  if (iFoundIt) {
    lock (_syncAllDone) { _allDone = AllDone };
    return;
  }
  ...
  //   see if another thread has set _allDone to AllDone
  lock (_syncAllDone) {
    if (_allDone == AllDone) {
      return; // if they did, then exit
    }
  }

这样做的缺点是锁定可能会很慢,但您需要测试您的情况。优点是,如果您仍然使用锁来执行其他操作,例如从线程中写出结果,则不会有任何额外的开销。

用于ManualResetEvent简单的成功指示

这并不是重置事件的真正预期用途,但它可以工作。(如果使用 .NET 4 或更高版本,请使用ManualResetEventSlim代替ManualResetEvent):

private ManualResetEvent _mreAllDone = new ManualResetEvent(true); // will not block a thread
...
  if (iFoundIt) {
    _mreAllDone.Reset(); // stop other threads proceeding
    return;
  }
  ...
  //   see if another thread has reset _mreAllDone by testing with a 0 timeout
  if (!_mreAllDone.WaitOne(0)) {
      return; // if they did, then exit
  }

相位同步 - 其他方法

所有其他方法都变得更加复杂,因为您必须进行双向连续检查以防止竞争条件和永久阻塞的线程。我不推荐它们,所以我不会在这里提供示例(它会很长而且很复杂)。


参考:

于 2013-06-23T16:13:18.123 回答
1
thread.Join() 

可能是你的追求。这将使您当前的线程阻塞,直到另一个线程结束。

可以通过将所有线程同步到一个点来加入多个线程。

List<Thread> threads = new List<Thread>();
threads.Add(new Thread(new ThreadStart(<Actual method here>)));
threads.Add(new Thread(new ThreadStart(<Another method here>)));
threads.Add(new Thread(new ThreadStart(<Another method here>)));

foreach(Thread thread in threads)
{
  thread.Start();
}
//All your threads are now running
foreach(Thread thread in threads)
{
  thread.Join();
}
//You wont get here until all those threads have finished
于 2013-06-21T12:09:07.753 回答
0

在某些情况下,您可以使用 AutoResetEvent 来等待线程的某些结果。您可以将任务用于某些工作人员的启动/停止/等待结果。您可以将 Producer/Consumer 模式与 BlockingCollection 一起使用,以防您的函数吃掉一些数据并返回一些东西的集合。

于 2013-06-21T11:48:53.690 回答