10

std::list<Info> infoList的应用程序中有两个线程之间共享。这 2 个线程正在访问此列表,如下所示:

线程 1:使用push_back(),pop_front()clear()在列表中(视情况而定) 线程 2:使用 aniterator遍历列表中的项目并执行一些操作。

线程 2 正在迭代列表,如下所示:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i)
{
  DoAction(i);
}

代码使用 GCC 4.4.2 编译。

有时 ++i 会导致段错误并使应用程序崩溃。该错误是在 std_list.h 第 143 行的以下行中引起的:

_M_node = _M_node->_M_next;

我想这是一个赛车条件。当线程 2 对其进行迭代时,该列表可能已被线程 1 更改甚至清除。

我使用 Mutex 来同步对这个列表的访问,并且在我的初始测试中一切正常。但是系统只是在压力测试下冻结,使得这个解决方案完全不可接受。这个应用程序是一个实时应用程序,我需要找到一个解决方案,以便两个线程都可以尽可能快地运行,而不会影响应用程序的总吞吐量。

我的问题是:线程 1 和线程 2 需要尽快执行,因为这是一个实时应用程序。我可以做些什么来防止这个问题并仍然保持应用程序性能?是否有任何无锁算法可用于此类问题?

如果我在线程 2 的迭代中错过了一些新添加Info的对象,那没关系,但我能做些什么来防止迭代器成为悬空指针?

谢谢

4

12 回答 12

5

您的 for() 循环可能会保持锁定相对较长的时间,具体取决于它迭代的元素数量。如果它“轮询”队列,不断检查是否有新元素可用,您可能会遇到真正的麻烦。这使得线程拥有互斥锁的时间过长,生产者线程很少有机会闯入并添加元素。并在此过程中消耗大量不必要的 CPU 周期。

您需要一个“有界阻塞队列”。不要自己写,锁的设计不是小菜一碟。很难找到好的示例,其中大部分是 .NET 代码。 这篇文章看起来很有希望。

于 2010-01-16T11:55:09.663 回答
4

一般来说,以这种方式使用 STL 容器是不安全的。您将必须实现特定的方法以使您的代码线程安全。您选择的解决方案取决于您的需求。我可能会通过维护两个列表来解决这个问题,每个线程中一个。并通过无锁队列传达更改(在此问题的评论中提到)。您还可以通过将它们包装在 boost::shared_ptr 中来限制 Info 对象的生命周期,例如

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList;

enum CommandValue
{
    Insert,
    Delete
}

struct Command
{
    CommandValue operation;
    InfoReference reference;
}

typedef LockFreeQueue<Command> CommandQueue;

class Thread1
{
    Thread1(CommandQueue queue) : m_commands(queue) {}
    void run()
    {
        while (!finished)
        {
            //Process Items and use 
            // deleteInfo() or addInfo()
        };

    }

    void deleteInfo(InfoReference reference)
    {
        Command command;
        command.operation = Delete;
        command.reference = reference;
        m_commands.produce(command);
    }

    void addInfo(InfoReference reference)
    {
        Command command;
        command.operation = Insert;
        command.reference = reference;
        m_commands.produce(command);
    }
}

private:
    CommandQueue& m_commands;
    InfoList m_infoList;
}   

class Thread2
{
    Thread2(CommandQueue queue) : m_commands(queue) {}

    void run()
    {
        while(!finished)
        {
            processQueue();
            processList();
        }   
    }

    void processQueue()
    {
        Command command;
        while (m_commands.consume(command))
        {
            switch(command.operation)
            {
                case Insert:
                    m_infoList.push_back(command.reference);
                    break;
                case Delete:
                    m_infoList.remove(command.reference);
                    break;
            }
        }
    }

    void processList()
    {
        // Iterate over m_infoList
    }

private:
    CommandQueue& m_commands;
    InfoList m_infoList;
}   


void main()
{
CommandQueue commands;

Thread1 thread1(commands);
Thread2 thread2(commands);

thread1.start();
thread2.start();

waitforTermination();

}

这还没有编译。您仍然需要确保对Info对象的访问是线程安全的。

于 2010-01-16T13:20:29.090 回答
3

我想知道这个列表的目的是什么,这样会更容易回答这个问题。

正如Hoare 所说,尝试共享数据以在两个线程之间进行通信通常是一个坏主意,而应该进行通信以共享数据:即消息传递。

例如,如果此列表正在对队列建模,您可能只需使用多种方式之一在两个线程之间进行通信(例如套接字)。消费者/生产者是一个标准且众所周知的问题。

如果您的物品很贵,那么只在通信过程中传递指针,您将避免复制物品本身。

一般来说,共享数据非常困难,尽管不幸的是,这是我们在学校听到的唯一编程方式。通常只有通信“通道”的低级实现才应该担心同步,您应该学习使用通道进行通信,而不是试图模仿它们。

于 2010-01-16T13:41:08.513 回答
1

为了防止您的迭代器失效,您必须锁定整个for循环。现在我猜第一个线程可能难以更新列表。我会尝试给它一个机会在每次(或每 N 次迭代)上完成它的工作。

在看起来像这样的伪代码中:

mutex_lock();
for(...){
  doAction();
  mutex_unlock();
  thread_yield();  // give first thread a chance
  mutex_lock();
  if(iterator_invalidated_flag) // set by first thread
    reset_iterator();
}
mutex_unlock();
于 2010-01-16T11:32:36.930 回答
1

您必须决定哪个线程更重要。如果是更新线程,那么它必须通知迭代器线程停止、等待并重新开始。如果它是迭代器线程,它可以简单地锁定列表,直到迭代完成。

于 2010-01-16T11:44:59.050 回答
1

正如您提到的,您不在乎您的迭代消费者是否错过了一些新添加的条目,您可以在下面使用写时复制列表。这允许迭代消费者在列表首次启动时对其一致的快照进行操作,而在其他线程中,对列表的更新会产生新的但一致的副本,而不会干扰任何现存的快照。

这里的交易是列表的每次更新都需要锁定足够长的独占访问以复制整个列表。这种技术偏向于拥有许多并发阅读器和不太频繁的更新。

尝试向容器添加内在锁定首先需要您考虑哪些操作需要在原子组中运行。例如,在尝试弹出第一个元素之前检查列表是否为空需要原子的pop-if-not-empty操作;否则,当呼叫者收到答案并尝试对其采取行动时,列表为空的答案可能会发生变化。

在上面的示例中不清楚迭代必须遵守什么保证。列表中的每个元素最终都必须被迭代线程访问吗?它可以多次通过吗?一个线程在另一个线程DoAction()针对它运行时从列表中删除一个元素是什么意思?我怀疑解决这些问题会导致重大的设计变更。


您正在使用 C++,并且您提到需要一个带有pop-if-not-empty操作的队列。多年前,我使用ACE 库的并发原语编写了一个双锁队列,因为Boost 线程库尚未准备好用于生产使用,而 C++ 标准库包含此类设施的机会是一个遥远的梦想。将它移植到更现代的东西会很容易。

我的这个队列——叫做concurrent::two_lock_queue——只允许通过 RAII 访问队列的头部。这确保了获取锁以读取头部将始终与锁的释放配合。消费者构造一个const_front(对头元素的常量访问)、一个front(对头元素的非常量访问)或一个renewable_front(对头和后继元素的非常量访问)对象来表示访问队列头元素的独占权. 这样的“正面”对象是无法复制的。

Classtwo_lock_queue还提供了一个pop_front()函数,该函数等待至少一个元素可以被删除,但是,为了与不混合容器突变和值复制的std::queue's 和' 风格保持一致,返回 void。std::stackpop_front()

在伴随文件中,有一个名为 的类型concurrent::unconditional_pop,它允许通过 RAII 确保队列的头元素将在退出当前范围时弹出。

伴随文件error.hh定义了使用函数产生的异常two_lock_queue::interrupt(),用于解除阻塞等待访问队列头部的线程。

查看代码,如果您需要更多关于如何使用它的解释,请告诉我。

于 2010-01-16T15:27:17.893 回答
1

最好的方法是使用内部同步的容器。TBB 和微软的 concurrent_queue 做到了这一点。Anthony Williams 在他的博客上也有一个很好的实现

于 2010-01-16T21:18:36.640 回答
1

其他人已经提出了无锁替代方案,所以我会回答,就好像你被锁卡住了......

当您修改列表时,现有迭代器可能会变得无效,因为它们不再指向有效内存(列表在需要增长时会自动重新分配更多内存)。为了防止无效的迭代器,您可以在消费者遍历列表时使生产者阻塞在互斥体上,但这无需等待生产者。

queue<Info>::pop_front()如果您使用队列而不是列表,并且让您的消费者使用同步调用而不是可以在您背后失效的迭代器,那么生活会更轻松。如果您的消费者真的需要Info一次吞食大块,那么使用一个条件变量,这将使您的消费者阻塞 until queue.size() >= minimum

Boost库有一个很好的条件变量可移植实现(甚至适用于旧版本的 Windows),以及通常的线程库的东西

对于使用(老式)锁定的生产者-消费者队列,请查看ZThreads库的BlockingQueue模板类。我自己没有使用过 ZThreads,担心缺少最近的更新,并且因为它似乎没有被广泛使用。但是,我将它用作滚动我自己的线程安全生产者-消费者队列的灵感(在我了解无锁队列和TBB之前)。

Boost 审查队列中似乎有一个无锁队列/堆栈库。让我们希望我们在不久的将来看到一个新的 Boost.Lockfree!:)

如果有兴趣,我可以编写一个使用 std::queue 和 Boost 线程锁定的阻塞队列示例。

编辑

Rick 的答案引用的博客已经有一个使用 std::queue 和 Boost condvars 的阻塞队列示例。如果您的消费者需要吞噬块,您可以扩展示例如下:

void wait_for_data(size_t how_many)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.size() < how_many)
        {
            the_condition_variable.wait(lock);
        }
    }

您可能还想调整它以允许超时和取消。

你提到速度是一个问题。如果您Info的 s 是重量级的,您应该考虑将它们传递给shared_ptr. 您还可以尝试使您Info的 s 固定大小并使用内存池(这可能比堆快得多)。

于 2010-01-17T04:18:09.153 回答
1

如果您使用的是 C++0x,您可以通过这种方式在内部同步列表迭代:

假设该类有一个名为 objects_ 的模板列表和一个名为 mutex_ 的 boost::mutex

toAll 方法是列表包装器的成员方法

 void toAll(std::function<void (T*)> lambda)
 {
 boost::mutex::scoped_lock(this->mutex_);
 for(auto it = this->objects_.begin(); it != this->objects_.end(); it++)
 {
      T* object = it->second;
      if(object != nullptr)
      {
                lambda(object);
           }
      }
 }

来电:

synchronizedList1->toAll(
      [&](T* object)->void // Or the class that your list holds
      {
           for(auto it = this->knownEntities->begin(); it != this->knownEntities->end(); it++)
           {
                // Do something
           }
      }
 );
于 2010-11-30T09:47:48.087 回答
0

您必须使用一些线程库。如果您使用的是 Intel TBB,则可以使用 concurrent_vector 或 concurrent_queue。看到这个

于 2010-01-16T12:01:05.347 回答
0

如果您想继续std::list在多线程环境中使用,我建议将其包装在一个带有互斥锁的类中,该互斥锁提供对其的锁定访问。根据具体用法,切换到事件驱动的队列模型可能是有意义的,其中消息在多个工作线程正在使用的队列上传递(提示:生产者-消费者)。

我会认真考虑马蒂厄的想法。使用多线程编程解决的许多问题最好使用线程或进程之间的消息传递来解决。如果您需要高吞吐量并且不绝对要求处理共享相同的内存空间,请考虑使用消息传递接口 (MPI)之类的东西,而不是滚动您自己的多线程解决方案。有一堆可用的 C++ 实现 - OpenMPIBoost.MPIMicrosoft MPI等。

于 2010-01-16T15:01:40.770 回答
-1

在这种情况下,我认为您根本无法在没有任何同步的情况下逃脱,因为某些操作会使您正在使用的迭代器无效。使用列表,这是相当有限的(基本上,如果两个线程都试图同时将迭代器操作到同一个元素),但仍然存在在尝试的同时删除元素的危险附加一个。

你有机会把锁放在一边DoAction(i)吗?您显然只想在可以逃脱的绝对最短时间内保持锁定,以最大限度地提高性能。从上面的代码中,我认为您需要稍微分解循环以加快操作的双方。

类似于以下内容:

while (processItems) {
  Info item;
  lock(mutex);
  if (!infoList.empty()) {
     item = infoList.front();
     infoList.pop_front();
  }
  unlock(mutex);
  DoAction(item);
  delayALittle();
}

并且插入函数仍然必须看起来像这样:

lock(mutex);
infoList.push_back(item);
unlock(mutex);

除非队列可能很大,否则我很想使用 astd::vector<Info>甚至 a之类的东西std::vector<boost::shared_ptr<Info> >来最小化 Info 对象的复制(假设与 boost::shared_ptr 相比,复制这些对象的成本更高。通常,对向量的操作往往比对列表的操作要快一些,尤其是当存储在向量中的对象很小且复制成本低时。

于 2010-01-16T11:38:45.173 回答