5

故事

有一个作家线程,定期从某个地方收集数据(实时,但这在问题中并不重要)。然后有很多读者从这些数据中阅读。通常的解决方案是使用两个读写器锁和两个缓冲区,如下所示:

Writer (case 1):
acquire lock 0                        
loop
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

或者

Writer (case 2):
acquire lock 0                        
loop
    acquire other lock
    free this lock
    swap buffers
    write to current buffer
    wait for next period

问题

在这两种方法中,如果获取其他锁操作失败,则不会进行任何交换,并且写入器将覆盖其先前的数据(因为写入器是实时的,它无法等待读取器)所以在这种情况下,所有读取器都会丢失该帧数据的。

不过这没什么大不了的,阅读器是我自己的代码,而且它们很短,所以使用双缓冲,这个问题就解决了,如果有问题,我可以让它成为三缓冲(或更多)。

问题是我想尽量减少延迟。想象案例1:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up, and again writes to buffer0

在**此时**,理论上其他读者可以读取数据,buffer0只要作者可以在读者完成后进行交换而不是等待下一个周期。在这种情况下发生的情况是,仅仅因为一个阅读器迟到了,所有阅读器都错过了一帧数据,而这个问题本来可以完全避免。

案例2类似:

writer writes to buffer0                reader is idle
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)
|
|                                       reader starts reading buffer1
writer wakes up                         |
it can't acquire lock1                  because reader is still reading buffer1
overwrites buffer0

我尝试混合解决方案,因此作者在写入后立即尝试交换缓冲区,如果不可能,则在下一个时期醒来后立即交换。所以是这样的:

Writer (case 3):
acquire lock 0                        
loop
    if last buffer swap failed
        acquire other lock
        free this lock
        swap buffers
    write to current buffer
    acquire other lock
    free this lock
    swap buffers
    wait for next period

现在延迟问题仍然存在:

writer writes to buffer0                reader is reading buffer1
writer can't acquire lock1              because reader is still reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      <- **this point**
|
|
writer wakes up
swaps buffers
writes to buffer1

同样在**此时**,所有读者都可以开始阅读buffer0,这是在写入后的短暂延迟buffer0,但他们必须等到写入器的下一个周期。

问题

问题是,我该如何处理?如果我希望编写器在所需的时间段精确执行,则需要使用 RTAI 函数等待该时间段,我不能这样做

Writer (case 4):
acquire lock 0                        
loop
    write to current buffer
    loop a few times or until the buffer has been swapped
        sleep a little
        acquire other lock
        free this lock
        swap buffers
    wait for next period

这引入了抖动。因为“几次”可能碰巧比“等待下一个时期”更长,所以作者可能会错过其时期的开始。

为了更清楚,这就是我想要发生的事情:

writer writes to buffer0                reader is reading buffer1
|                                       |
|                                       reader finishes reading,
| (writer waiting for next period)      As soon as all readers finish reading,
|                                         the buffer is swapped
|                                       readers start reading buffer0
writer wakes up                         |
writes to buffer1

我已经找到的

我发现read-copy-update据我了解,它一直为缓冲区分配内存并释放它们,直到读者完成它们,这对我来说是不可能的,原因有很多。一,线程在内核和用户空间之间共享。其次,使用 RTAI,您不能在实时线程中分配内存(因为那样您的线程将调用 Linux 的系统调用,从而破坏实时性!(更不用说使用 Linux 自己的 RCU 实现了,因为出于同样的原因)

我还考虑过有一个额外的线程以更高的频率尝试交换缓冲区,但这听起来不是一个好主意。首先,它本身需要与 writer 同步,其次,我有许多这样的 writer-readers 并行工作在不同的部分,每个 writer 一个额外的线程似乎太多了。在与每个作者同步方面,所有作者的一个线程似乎非常复杂。

4

5 回答 5

3

您使用什么 API 来处理读写器锁?你有一个定时锁,比如pthread_rwlock_timedwrlock吗?如果是,我认为它可以解决您的问题,如以下代码所示:

void *buf[2];

void
writer ()
{
  int lock = 0, next = 1;

  write_lock (lock);
  while (1)
    {
      abs_time tm = now() + period;

      fill (buf [lock]);
      if (timed_write_lock (next, tm))
        {
          unlock (lock);
          lock = next;
          next = (next + 1) & 1;
        }
      wait_period (tm);
    }
}


void
reader ()
{
  int lock = 0;
  while (1)
    {
      reade_lock (lock);
      process (buf [lock]);
      unlock (lock);
      lock = (lock + 1) & 1;
    }
}

这里发生的情况是,写入器是等待锁定还是等待下一个周期并不重要,只要它肯定会在下一个周期到来之前醒来。绝对超时确保了这一点。

于 2011-11-10T09:27:50.517 回答
1

这不正是三重缓冲应该解决的问题吗?所以你有 3 个缓冲区,我们称它们为 write1、write2 和 read。写入线程在写入 write1 和 write2 之间交替,确保它们永远不会阻塞,并且最后一个完整的帧始终可用。然后在读取线程中,在某个适当的时间点(例如,就在读取一帧之前或之后),读取缓冲区与可用的写入缓冲区一起翻转。

虽然这将确保写入器永远不会阻塞(缓冲区翻转可以通过翻转两个指针非常快速地完成,甚至可能使用 CAS 原子而不是锁),但仍然存在读取器必须等待其他读取器完成的问题在翻转之前使用读取缓冲区。我想这可以通过一个可以翻转可用的读取缓冲区池来稍微解决 RCU 式的问题。

于 2011-10-18T13:02:02.797 回答
1
  • 使用队列(FIFO 链表)
  • 实时写入器将始终追加(入队)到队列的末尾
  • 读者总是从队列的开头移除(出队)
  • 如果队列为空,读者将阻塞

编辑以避免动态分配

我可能会使用循环队列...我会使用内置的 __sync 原子操作。 http://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Atomic-Builtins.html#Atomic-Builtins

  • 循环队列(FIFO 2d 数组)
    • 例如:字节[][] 数组 = 新字节[MAX_SIZE][BUFFER_SIZE];
    • 开始和结束索引指针
  • Writer 覆盖 Array[End][] 处的缓冲区
    • 如果 Start 最终一直循环,Writer 可以递增 Start
  • Reader 从 Array[Start][] 获取缓冲区
    • 如果 Start == End 则读取器阻塞
于 2011-10-18T15:10:29.317 回答
1

如果您不希望作者等待,也许它不应该获得其他人可能持有的锁。不过,我会让它执行某种同步,以确保它真正写入的内容被写出 - 通常,大多数同步调用将导致执行内存刷新或屏障指令,但细节将取决于内存模型你的 cpu 和你的线程包的实现。

我会看看周围是否有任何其他同步原语更适合事情,但如果迫在眉睫,我会让编写者锁定并解锁其他人从未使用过的锁定。

然后,读者必须准备好时不时地错过一些东西,并且必须能够检测到他们何时错过了一些东西。我会将一个有效性标志和一个长序列计数与每个缓冲区相关联,并让编写者执行类似“清除有效性标志、递增序列计数、同步、写入缓冲区、递增序列计数、设置有效性标志、同步”之类的操作。如果阅读器读取一个序列计数,同步,看到有效性标志为真,读出数据,同步,并重新读取相同的序列计数,那么它可能有一些希望它没有得到乱码数据。

如果您要这样做,我将对其进行详尽的测试。在我看来这似乎是合理的,但它可能不适用于您对从编译器到内存模型的所有内容的特定实现。

另一个想法,或者检查这个的方法,是在你的缓冲区中添加一个校验和并最后写入。

另请参阅关于无锁算法的搜索,例如http://www.rossbencina.com/code/lockfree

为此,您可能需要一种方式让作者向熟睡的读者发出信号。您可能可以为此使用 Posix 信号量 - 例如,当特定信号量达到给定序列号或缓冲区变为有效时,让读者要求编写者在特定信号量上调用 sem_post()。

于 2011-10-18T17:56:45.660 回答
0

另一种选择是坚持锁定,但确保读者永远不会因为持有锁而挂起太久。读者可以通过在持有该锁时不做任何其他事情来保持持有锁所花费的时间短且可预测,而是从写入者的缓冲区复制数据。唯一的问题是低优先级的读取器可能会在写入的中途被更高优先级的任务中断,解决方法是http://en.wikipedia.org/wiki/Priority_ceiling_protocol

鉴于此,如果写入线程具有高优先级,则每个缓冲区要完成的最坏情况工作是写入线程填充缓冲区,并且每个读取线程将数据从该缓冲区复制到另一个缓冲区。如果您在每个周期中都能负担得起,那么写入器线程和一定数量的读取器数据复制将始终完成,而读取器处理他们复制的数据可能会也可能不会完成他们的工作。如果他们不这样做,他们就会落后,并且在他们下次抓住锁并环顾四周以查看他们想要复制的缓冲区时会注意到这一点。

FWIW,我阅读实时代码的经验(当需要显示错误存在,而不是在我们的代码中时)是它令人难以置信且故意简单,布局非常清晰,并且不一定比它更有效需要在最后期限之前完成,因此如果您负担得起的话,一些显然毫无意义的数据复制以直接锁定工作可能是一个不错的选择。

于 2011-10-19T18:16:53.920 回答