2

我有一个正在由多个任务处理的对象。我多次复制这个对象并存储在一个任务的向量中,以检索它自己的副本以在 parallel_for 循环中工作。下面是带有标准向量的代码。

这个想法是我从一个大小为 0 的向量开始,并根据并行启动并需要自己的副本的任务数量来增长它。我使用原子“_poolIndex”来跟踪每次运行的全局索引。

    Object& GetObject()
    {
        if (_poolIndex >= _objectPool.size())
        {
            lock_guard<mutex> lock(_mutex);
            Object copy(_original);
            _objectPool.push_back(move(copy));
        }

        int taskIndex = _poolIndex.fetch_add(1);

        return _objectPool[taskIndex];
    }

我在向量类的以下代码中得到索引超出范围,即使当调试器中断时 position < size :

reference operator[](size_type _Pos)
{   // subscript mutable sequence
        #if _ITERATOR_DEBUG_LEVEL == 2
if (size() <= _Pos)
    {   // report error
    _DEBUG_ERROR("vector subscript out of range");
    _SCL_SECURE_OUT_OF_RANGE;
    }

所以很明显,检索 size() <= _Pos 的部分已经评估了一些不同的东西......我很困惑,因为我有一个推到向量上的锁。

然后我尝试了 concurrent_vector 并且 push_back 给了我编译问题,这是 Visual Studio 2013 错误:

错误 35 错误 C2059:语法错误:'&' c:\program files (x86)\microsoft visual studio 12.0\vc\include\concurrent_vector.h 1492 1 UnitTests

错误 36 错误 C2143:语法错误:缺少 ';' 在')'之前 c:\program files (x86)\microsoft visual studio 12.0\vc\include\concurrent_vector.h 1492 1 UnitTests

在 concurrent_vector 类中,当我将 _objectPool 从 vector 切换为 concurrent_vector 时,以下代码给出了问题:

    void _Init(const void *_Src)
    {
        for(; _I < _N; ++_I)
            new( &_My_array[_I] ) _Ty(*static_cast<const _Ty*>(_Src));
    }

如果有人可以就上述两个问题提供指导,那就太好了。

我还试图最小化关键部分以提高效率。这个想法是在启动算法并多次运行之后,_objectPool 将拥有大部分(如果不是全部)副本已经推送到向量上。

4

1 回答 1

1

问题

首先,存在数据竞争,因为从_poolIndex(iniftaskIndex) 读取的两个值不同步。交换它们并taskIndex在条件下使用,而不是再次读取共享状态。

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1);
    if (taskIndex >= _objectPool.size())    // issue #2: size() is not thread-safe
    {
        lock_guard<mutex> lock(_mutex);
        //This: Object copy(_original);
        //      _objectPool.push_back(move(copy));
        // can be simplified to:
        _objectPool.push_back(_original); // issue #3: it can push at different index
    }

    return _objectPool[taskIndex];
}

std::vector在某些情况下,第二个问题可能不可见。但它肯定会破坏 concurrent_vector 的使用(请参阅原因)。

第三个问题是它taskIndex与锁定顺序不同步,因此它可以构造一个对象但返回尚未构造或分配的对象(超出范围)。

正确的方式

如果我正确理解您的意图,您希望将第一次创建的对象重用于第二次,并在需要时创建更多对象。我将尝试解决以下代码中的问题:

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1);                // get current index in the pool
    if (taskIndex >= _objectPoolSize.load(memory_order_acquire)) // atomic<size_t>
    {
        lock_guard<mutex> lock(_mutex);
        size_t sz = _objectPoolSize.load(memory_order_relaxed);
        if (taskIndex >= sz) {                              // double-check under the lock
            sz *= 2;                  // or any other factor, get a bunch of new objects at once
            _objectPool.resize(sz, _original);              // construct new copies of _original
            _objectPoolSize.store(sz, memory_order_release);// protect from reorder with resize
        }
    }
    return _objectPool[taskIndex];
}

并发向量

至于 concurrent_vector (在中都可用),您可能希望使用grow_to_at_least来消除锁定..但是:

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
    // construct new copies of _original if free objects is about to ran out
    _objectPool.grow_to_at_least(taskIndex+10/*or other*/, _original );
    return _objectPool[taskIndex];  // ISSUE: it might not be constructed yet in other thread
}

它遇到与相同的问题size()。因此,它要么需要与 _objectPoolSize 进行一些同步,要么需要与基于零填充分配的构造函数进行逐项同步,如同一博客中所述。

于 2014-11-15T16:59:07.107 回答