5

我有以 C 语言编写的无锁队列,以链表的形式包含来自多个线程的请求,这些请求发布到单个线程并在单个线程中处理。经过几个小时的压力,我最终让最后一个请求的下一个指针指向自身,这会创建一个无限循环并锁定处理线程。

该应用程序在 Linux 和 Windows 上都运行(并且失败)。我在 Windows 上调试,我COMPARE_EXCHANGE_PTR映射到InterlockedCompareExchangePointer

这是将请求推送到列表头部的代码,并从多个线程调用:

void push_request(struct request * volatile * root, struct request * request)
{
    assert(request);

    do {
        request->next = *root;
    } while(COMPARE_EXCHANGE_PTR(root, request, request->next) != request->next);
}

这是从列表末尾获取请求的代码,仅由处理它们的单个线程调用:

struct request * pop_request(struct request * volatile * root)
{
    struct request * volatile * p;
    struct request * request;

    do {
        p = root;
        while(*p && (*p)->next) p = &(*p)->next; // <- loops here
        request = *p;
    } while(COMPARE_EXCHANGE_PTR(p, NULL, request) != request);

     assert(request->next == NULL);

     return request;
}

请注意,我没有使用尾指针,因为我想避免在push_request. 但是我怀疑问题可能出在我找到列表末尾的方式上。

有几个地方将请求推送到队列中,但它们通常看起来像这样:

// device->requests is defined as struct request * volatile requests;
struct request * request = malloc(sizeof(struct request));
if(request) {
    // fill out request fields
    push_request(&device->requests, request);
    sem_post(device->request_sem);
}

处理请求的代码不仅如此,但本质上是在循环中执行此操作:

if(sem_wait_timeout(device->request_sem, timeout) == sem_success) {
    struct request * request = pop_request(&device->requests);
    // handle request
    free(request);
}

我还刚刚添加了一个在每次操作之前和之后检查重复列表的功能,但我担心这个检查会改变时间,所以我永远不会遇到它失败的地方。(我正在等待它在我写这篇文章时打破。)

当我中断挂起程序时,处理程序线程在pop_request标记位置循环。我有一个或多个请求的有效列表,最后一个的 next 指针指向它自己。请求队列通常很短,我从未见过超过 10 个,只有 1 和 3 次我可以在调试器中查看此故障。

我尽可能多地考虑了这一点,得出的结论是,除非我两次推送相同的请求,否则我永远无法在列表中出现循环。我很确定这永远不会发生。我也相当确定(尽管不完全)这不是ABA 问题

我知道我可能会同时弹出多个请求,但我相信这在这里无关紧要,而且我从未见过这种情况发生。(我也会解决这个问题)

我对如何破坏我的功能进行了长期而艰苦的思考,但我没有看到以循环结束的方法。

所以问题是:有人能找到一种方法来破坏它吗?有人可以证明这不能吗?

最终我会解决这个问题(也许通过使用尾指针或其他解决方案 - 锁定将是一个问题,因为发布的线程不应该被锁定,但我手头确实有一个 RW 锁)但我想确保更改列表实际上解决了我的问题(而不是因为时间不同而不太可能)。

4

1 回答 1

8

这很微妙,但你确实有一个竞争条件。

从一个包含一个元素的列表开始,req1. 所以我们有:

device->requests == req1;
req1->next == NULL;

现在,我们推送一个新元素req2,同时尝试弹出队列。推动req2首先开始。while 循环体运行,所以我们现在有:

device->requests == req1;
req1->next == NULL;
req2->next == req1;

然后COMPARE_EXCHANGE_PTR运行,所以我们有:

device->requests == req2;
req1->next == NULL;
req2->next == req1;

...和COMPARE_EXCHANGE_PTR()回报req1。现在,此时,在比较while条件之前,推送被中断,弹出开始运行。

弹出正确运行到完成,弹出req1- 这意味着我们有:

device->requests == req2;
req2->next == NULL;

推送重新开始。它现在获取request->next进行比较 - 它获取 的req2->next,即NULL. 它req1与比较NULL,比较成功,while 循环再次运行,现在我们有:

device->requests == req2;
req2->next == req2;

这次测试失败,while 循环退出,你有了你的循环。


这应该解决它:

void push_request(struct request * volatile * root, struct request * request)
{
    struct request *oldroot;

    assert(request);

    do {
        request->next = oldroot = *root;
    } while(COMPARE_EXCHANGE_PTR(root, request, oldroot) != oldroot);
}
于 2010-04-27T06:07:55.727 回答