6

简短版:如何从单个线程释放多个锁,而不会在中途被抢占?

我有一个设计为在 N 核机器上运行的程序。它由一个主线程和N个工作线程组成。每个线程(包括主线程)都有一个可以阻塞的信号量。通常,每个工作线程在减少其信号量时都会被阻塞,并且主线程正在运行。但是,有时,主线程应该唤醒工作线程在一定时间内做他们的事情,然后阻塞自己的信号量,等待它们全部重新进入睡眠状态。像这样:

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_semaphore = semaphore(0)

    while True:
        ...do some work...
        workers_to_wake = foo()
        for i in workers_to_wake:
            worker_semaphore[i].increment() # wake up worker n
        for i in workers_to_wake:
            main_semaphore.decrement() # wait for all workers

def worker_thread(i):
    while True:
        worker_semaphore(i).decrement() # wait to be woken
        ...do some work...
        main_semaphore.increment() # report done with step

一切都很好。问题是,其中一个被唤醒的工作线程可能会在唤醒工作线程的中途抢占主线程:例如,当 Windows 调度程序决定提高该工作线程的优先级时,就会发生这种情况。这不会导致死锁,但效率很低,因为其余线程保持休眠状态,直到抢占的工作人员完成其工作。它基本上是优先级反转,主线程在其中一个工作线程上等待,而一些工作线程在主线程上等待。

我可能会为此找出特定于操作系统和调度程序的技巧,例如在 Windows 下禁用优先级提升,以及摆弄线程优先级和处理器亲和力,但我想要一些跨平台的、健壮和干净的东西。那么:如何以原子方式唤醒一堆线程?

4

4 回答 4

3

TL; 博士

如果你真的必须尽可能多地从你的工作人员那里得到尽可能多的东西,只需使用事件信号量、控制块和屏障来代替你的信号量。但是请注意,这是一个更脆弱的解决方案,因此您需要平衡任何潜在收益与此不利因素。

语境

首先,我需要在我们的讨论中总结更广泛的背景......

您有一个 Windows 图形应用程序。它具有所需的帧速率,因此您需要主线程以该速率运行,以精确的时间间隔调度所有工作人员,以便他们在刷新间隔内完成工作。这意味着您对每个线程的启动和执行时间有非常严格的限制。此外,您的工作线程并不完全相同,因此您不能只使用单个工作队列。

问题

与任何现代操作系统一样,Windows 具有多种同步原语。然而,这些都没有直接提供一次通知多个原语的机制。浏览其他操作系统,我看到了类似的模式;它们都提供了等待多个原语的方法,但没有一个提供触发它们的原子方式。

那么我们能做些什么呢?您需要解决的问题是:

  1. 准确计时所有必需工人的启动时间。
  2. 刺激实际需要在下一帧中运行的工人。

选项

问题 1 最明显的解决方案是使用单个事件信号量,但您也可以使用读/写锁(通过在工作人员完成后获取写锁并让工作人员使用读锁)。所有其他选项不再是原子的,因此需要进一步同步以强制线程执行您想要的操作 - 就像 lossleader 对信号量内锁的建议一样。

但是由于应用程序的时间限制,我们想要一个尽可能减少上下文切换的最佳解决方案,所以让我们看看是否可以使用其中任何一个来解决问题 2... 你如何选择应该运行哪些工作线程如果我们只有一个事件信号量或读/写锁,从主开始?

嗯......读/写锁是一个线程将一些关键数据写入控制块并让许多其他线程从中读取的好方法。为什么不只拥有一个简单的布尔标志数组(每个工作线程一个),您的主线程会更新每一帧?可悲的是,您仍然需要停止执行工作人员,直到计时器弹出。简而言之,我们又回到了信号量和锁定解决方案。

但是,由于您的应用程序的性质,您可以多做一步。您可以依靠这样一个事实,即您知道您的工作人员没有在您的时间切片之外运行,而是使用事件信号量作为一种粗略的锁定形式。

最后的优化(如果您的环境支持它们)是使用屏障而不是主信号量。你知道所有n个线程都需要空闲才能继续,所以坚持下去。

一个解法

应用上述内容,您的伪代码将如下所示:

def main_thread(n):
    main_event = event()
    for i = 1 to n:
        worker_scheduled[i] = False
        spawn_thread(worker_thread, i)
    main_barrier = barrier(n+1)

    while True:
        ...do some work...
        workers_to_wake = foo()
        for i in workers_to_wake:
            worker_scheduled[i] = True
        main_event.set()
        main_barrier.enter() # wait for all workers
        main_event.reset()

def worker_thread(i):
    while True:
       main_event.wait()
       if worker_scheduled[i]:
            worker_scheduled[i] = False
            ...do some work...
       main_barrier.enter() # report finished for this frame.
       main_event.reset() # to catch the case that a worker is scheduled before the main thread

由于没有对 worker_scheduled 数组的明确监管,因此这是一个更加脆弱的解决方案。

因此,如果我必须从 CPU 中挤出最后一盎司的处理,我个人只会使用它,但听起来这正是您正在寻找的。

于 2016-06-23T00:05:57.553 回答
1

当唤醒算法复杂度为 O(n) 时,使用多个同步对象(信号量)是不可能的。但是,有几种方法可以解决它。

一次性释放

我不确定 Python 是否有必要的方法(你的问题是 Python 特有的吗?),但一般来说,信号量的操作带有参数指定递减/递增的数量。因此,您只需将所有线程放在同一个信号量上并将它们全部唤醒。类似的方法是使用条件变量并通知所有

事件循环

如果您仍然希望能够单独控制每个线程,但喜欢使用一对多通知的方法,请尝试异步 I/O 库libuv及其 Python 对应项)。在这里,您可以创建一个事件来一次唤醒所有线程,并为每个线程创建其单独的事件,然后在每个线程的事件循环中等待两个(或多个)事件对象。另一个库是在 pthread 的条件变量之上pevents实现的。WaitForMultipleObjects

代表醒来

另一种方法是用树状算法( O(log n) )替换您的 O(n) 算法,其中每个线程仅唤醒固定数量的其他线程,但将它们委托给唤醒其他线程。在边缘情况下,主线程只能唤醒另一个线程,这将唤醒其他人或启动链式反应。如果您想以其他线程的唤醒延迟为代价来减少主线程的延迟,它会很有用。

于 2016-06-20T20:13:00.490 回答
1

读/写锁

我通常在 POSIX 系统上用于一对多关系的解决方案是读/写锁。令我惊讶的是它们不是一个完整的通用语言,但大多数语言要么实现一个版本,要么至少有一个包可用于在任何存在的原语上实现它们,例如 python 的prwlock

from prwlock import RWLock

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_lock = RWLock()

    while True:
        main_lock.acquire_write()
        ...do some work...   
        workers_to_wake = foo()
        # The above acquire could be moved as low as here,
        # depending on how independent the above processing is..            
        for i in workers_to_wake:
            worker_semaphore[i].increment() # wake up worker n

        main_lock.release()


def worker_thread(i):
    while True:
        worker_semaphore(i).decrement() # wait to be woken
        main_lock.acquire_read()
        ...do some work...
        main_lock.release() # report done with step

障碍

屏障似乎是 Python 最接近预期的内置机制,用于阻止所有线程直到它们都被警告,但是:

  1. 它们是一个非常不寻常的解决方案,因此它们会使您的代码/体验更难翻译成其他语言。

  2. 我不想在这种要唤醒的线程数不断变化的情况下使用它们。鉴于您的 n 听起来很小,我很想使用常量Barrier(n)并通知所有线程以检查它们是否正在运行此循环。但:

  3. 我担心使用屏障会适得其反,因为任何被外部事物阻碍的线程都会阻碍它们,甚至具有资源依赖性提升的调度程序也可能会错过这种关系。需要所有 n 才能到达障碍物只会使情况变得更糟。

于 2016-06-21T22:58:20.550 回答
0

Peter Brittain 的解决方案,加上 Anton 提出的“树状唤醒”的建议,让我想到了另一个解决方案:链式唤醒。基本上,不是主线程做所有的唤醒,而是只唤醒一个线程;然后每个线程负责唤醒下一个线程。这里优雅的一点是只有一个挂起的线程准备运行,因此线程很少最终切换内核。事实上,这适用于严格的处理器关联,即使其中一个工作线程与主线程共享关联。

我做的另一件事是使用一个原子计数器,工作线程在睡觉前递减;这样,只有最后一个唤醒主线程,所以主线程也没有机会被唤醒几次只是为了做更多的信号量等待。

workers_to_wake = []
main_semaphore = semaphore(0)
num_woken_workers = atomic_integer()

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_semaphore = semaphore(0)

    while True:
        ...do some work...

        workers_to_wake = foo()
        num_woken_workers.atomic_set(len(workers_to_wake)) # set completion countdown
        one_to_wake = workers_to_wake.pop()
        worker_semaphore[one_to_wake].increment() # wake the first worker
        main_semaphore.decrement() # wait for all workers

def worker_thread(i):
    while True:
        worker_semaphore[i].decrement() # wait to be woken
        if workers_to_wake.len() > 0: # more pending wakeups
            one_to_wake = workers_to_wake.pop()
            worker_semaphore[one_to_wake].increment() # wake the next worker

        ...do some work...

        if num_woken_workers.atomic_decrement() == 0: # see whether we're the last one
            main_semaphore.increment() # report all done with step
于 2016-06-27T19:17:07.430 回答