我有两个线程,它们使用两个不同的功能。第一个从头到尾搜索,第二个从头到尾搜索。
现在我正在使用Thread.Sleep(10)
同步,但它需要太多时间,并且在这种情况下无法进行测试。
知道如何同步具有不同功能的两个线程吗?
我有两个线程,它们使用两个不同的功能。第一个从头到尾搜索,第二个从头到尾搜索。
现在我正在使用Thread.Sleep(10)
同步,但它需要太多时间,并且在这种情况下无法进行测试。
知道如何同步具有不同功能的两个线程吗?
这取决于你想做什么。
Interlocked
在没有锁或其他一些机制的情况下执行此操作(见下文)Task
对象Barrier
鉴于您正在进行 A* 搜索,您可能需要将所有两个/三个组合起来:
Barrier
协调步骤并更新步骤之间的开放集Task
CancellationToken
允许调用者取消搜索的对象。建议的另一个答案Semaphore
- 这并不适合您的需求(请参阅下面的评论)。
Barrier
可用于这样的搜索:
Interlocked
第一部分是检查成功。如果你想保持“无锁”,你可以使用Interlocked
来做到这一点,一般模式是:
// global success indicator
private const int NotDone = 0;
private const int AllDone = 1;
private int _allDone = NotDone;
private GeneralSearchFunction(bool directionForward) {
bool iFoundIt = false;
... do some search operations that won't take much time
if (iFoundIt) {
// set _allDone to AllDone!
Interlocked.Exchange(ref _allDone, AllDone);
return;
}
... do more work
// after one or a few iterations, if this thread is still going
// see if another thread has set _allDone to AllDone
if (Interlocked.CompareExchange(ref _allDone, NotDone, NotDone) == AllDone) {
return; // if they did, then exit
}
... loop to the top and carry on working
}
// main thread:
Thread t1 = new Thread(() => GeneralSearchFunction(true));
Thread t2 = new Thread(() => GeneralSearchFunction(false));
t1.Start(); t2.Start(); // start both
t1.Join(); t2.Join();
// when this gets to here, one of them will have succeeded
这是任何类型的成功或取消令牌的一般模式:
所以一个实现看起来像:
class Program
{
// global success indicator
private const int NotDone = 0;
private const int AllDone = 1;
private static int _allDone = NotDone;
private static int _forwardsCount = 0; // counters to simulate a "find"
private static int _backwardsCount = 0; // counters to simulate a "find"
static void Main(string[] args) {
var searchItem = "foo";
Thread t1 = new Thread(() => DoSearchWithBarrier(SearchForwards, searchItem));
Thread t2 = new Thread(() => DoSearchWithBarrier(SearchBackwards, searchItem));
t1.Start(); t2.Start();
t1.Join(); t2.Join();
Console.WriteLine("all done");
}
private static void DoSearchWithBarrier(Func<string, bool> searchMethod, string searchItem) {
while (!searchMethod(searchItem)) {
// after one or a few iterations, if this thread is still going
// see if another thread has set _allDone to AllDone
if (Interlocked.CompareExchange(ref _allDone, NotDone, NotDone) == AllDone) {
return; // if they did, then exit
}
}
Interlocked.Exchange(ref _allDone, AllDone);
}
public static bool SearchForwards(string item) {
// return true if we "found it", false if not
return (Interlocked.Increment(ref _forwardsCount) == 10);
}
public static bool SearchBackwards(string item) {
// return true if we "found it", false if not
return (Interlocked.Increment(ref _backwardsCount) == 20); // make this less than 10 to find it backwards first
}
}
当然,如果不使用,这将不是 .NET 4.5 Task
:
class Program
{
private static int _forwardsCount = 0; // counters to simulate a "find"
private static int _backwardsCount = 0; // counters to simulate a "find"
static void Main(string[] args) {
var searchItem = "foo";
var tokenSource = new CancellationTokenSource();
var allDone = tokenSource.Token;
Task t1 = Task.Factory.StartNew(() => DoSearchWithBarrier(SearchForwards, searchItem, tokenSource, allDone), allDone);
Task t2 = Task.Factory.StartNew(() => DoSearchWithBarrier(SearchBackwards, searchItem, tokenSource, allDone), allDone);
Task.WaitAll(new[] {t2, t2});
Console.WriteLine("all done");
}
private static void DoSearchWithBarrier(Func<string, bool> searchMethod, string searchItem, CancellationTokenSource tokenSource, CancellationToken allDone) {
while (!searchMethod(searchItem)) {
if (allDone.IsCancellationRequested) {
return;
}
}
tokenSource.Cancel();
}
...
}
但是,现在您已经将CancellationToken
用于错误的事情 - 实际上这应该保留给搜索的调用者以取消搜索,因此您应该使用CancellationToken
来检查请求的取消(只有调用者需要tokenSource
),并获得不同的成功同步(如Interlocked
上面的示例)退出。
由于许多原因,这变得更加困难,但有一个简单的方法。将Barrier
(.NET 4 的新功能)与退出信号结合使用,您可以:
线程同步有许多不同的方法,具体取决于您想要实现的目标。有些是:
Barrier
:如果您打算同时运行向前和向后搜索,这可能是最合适的。它也大声说出你的意图,即“所有线程都无法继续,直到他们每个人都达到障碍”ManualResetEvent
- 当一个线程释放一个信号时,所有其他线程都可以继续,直到它再次被设置。AutoResetEvent
类似,只是它只允许一个线程在再次阻塞之前继续。Interlocked
- 结合SpinWait
这是一个可行的无锁解决方案Semaphore
- 可以使用,但不太适合您的场景我在Barrier
这里只提供了一个完整的样本,因为它似乎最适合你的情况。Barrier
是性能最高的之一,仅次于ManualResetEventSlim
( ref.albahari ),但使用ManualResetEvent
将需要更复杂的代码。
要查看的其他技术,如果上述方法都不适合您,Monitor.Wait
并且Monitor.Pulse
(现在您正在使用锁定)和任务继续。后者更用于将数据从一个异步操作传递到另一个,但它可以用于您的场景。而且,与答案顶部的样本一样,您更有可能Task
与Barrier
使用一个而不是使用另一个相结合。Task Continuations 可用于对 A* 搜索中的开放集进行后步骤修订,但Barrier
无论如何您都可以轻松地使用它。
这段代码,使用Barrier
作品。从本质上讲,DoSearchWithBarrier
是唯一进行同步的位 - 其余的都是设置和拆卸代码。
class Program {
...
private static int _forwardsCount = 0; // counters to simulate a "find"
private static int _backwardsCount = 0; // counters to simulate a "find"
static void Main(string[] args) {
Barrier barrier = new Barrier(numThreads,
b => Console.WriteLine("Completed search iteration {0}", b.CurrentPhaseNumber));
var searchItem = "foo";
Thread t1 = new Thread(() => DoSearchWithBarrier(SearchForwards, searchItem, barrier));
Thread t2 = new Thread(() => DoSearchWithBarrier(SearchBackwards, searchItem, barrier));
t1.Start(); Console.WriteLine("Started t1");
t2.Start(); Console.WriteLine("Started t2");
t1.Join(); Console.WriteLine("t1 done");
t2.Join(); Console.WriteLine("t2 done");
Console.WriteLine("all done");
}
private static void DoSearchWithBarrier(Func<string, bool> searchMethod, string searchItem, Barrier barrier) {
while (!searchMethod(searchItem)) {
// while we haven't found it, wait for the other thread to catch up
barrier.SignalAndWait(); // check for the other thread AFTER the barrier
if (Interlocked.CompareExchange(ref _allDone, NotDone, NotDone) == AllDone) {
return;
}
}
// set success signal on this thread BEFORE the barrier
Interlocked.Exchange(ref _allDone, AllDone);
// wait for the other thread, and then exit (and it will too)
barrier.SignalAndWait();
}
...
}
这里有两件事:
Barrier
用于同步两个线程,因此在另一个线程赶上之前它们无法执行下一步Interlocked
正如我首先描述的那样,退出信号使用。为 A* 搜索实现此功能与上述示例非常相似。一旦所有线程都到达障碍并因此继续,您可以使用 ManualResetEvent 或简单lock
的然后让一个(并且只有一个)修改开放集。
Semaphore
这可能不是您想要的,因为当您拥有有限的资源池时最常使用它,需要访问的资源用户多于您拥有的资源。
想想在工作食堂角落里有 CoD 的 PlayStation - 4 个控制器,20 个人等待(WaitOne
)使用它,一旦你的角色死了,你Release
就是控制器,其他人取代你的位置。没有强制执行特定的 FIFO/LIFO 排序,实际上Release
可以由您使用的保镖调用以防止不可避免的战斗(即不强制执行线程标识)。
lock
简单的成功指示您可以通过锁定实现相同的目的。两者Interlocked
并lock
确保您在读取线程之间的公共变量时看不到任何内存缓存问题:
private readonly object _syncAllDone = new object();
...
if (iFoundIt) {
lock (_syncAllDone) { _allDone = AllDone };
return;
}
...
// see if another thread has set _allDone to AllDone
lock (_syncAllDone) {
if (_allDone == AllDone) {
return; // if they did, then exit
}
}
这样做的缺点是锁定可能会很慢,但您需要测试您的情况。优点是,如果您仍然使用锁来执行其他操作,例如从线程中写出结果,则不会有任何额外的开销。
ManualResetEvent
简单的成功指示这并不是重置事件的真正预期用途,但它可以工作。(如果使用 .NET 4 或更高版本,请使用ManualResetEventSlim
代替ManualResetEvent
):
private ManualResetEvent _mreAllDone = new ManualResetEvent(true); // will not block a thread
...
if (iFoundIt) {
_mreAllDone.Reset(); // stop other threads proceeding
return;
}
...
// see if another thread has reset _mreAllDone by testing with a 0 timeout
if (!_mreAllDone.WaitOne(0)) {
return; // if they did, then exit
}
所有其他方法都变得更加复杂,因为您必须进行双向连续检查以防止竞争条件和永久阻塞的线程。我不推荐它们,所以我不会在这里提供示例(它会很长而且很复杂)。
参考:
thread.Join()
可能是你的追求。这将使您当前的线程阻塞,直到另一个线程结束。
可以通过将所有线程同步到一个点来加入多个线程。
List<Thread> threads = new List<Thread>();
threads.Add(new Thread(new ThreadStart(<Actual method here>)));
threads.Add(new Thread(new ThreadStart(<Another method here>)));
threads.Add(new Thread(new ThreadStart(<Another method here>)));
foreach(Thread thread in threads)
{
thread.Start();
}
//All your threads are now running
foreach(Thread thread in threads)
{
thread.Join();
}
//You wont get here until all those threads have finished
在某些情况下,您可以使用 AutoResetEvent 来等待线程的某些结果。您可以将任务用于某些工作人员的启动/停止/等待结果。您可以将 Producer/Consumer 模式与 BlockingCollection 一起使用,以防您的函数吃掉一些数据并返回一些东西的集合。