4

我使用std::threadgcc 作为我的编译器来实现Cormen 的算法简介中所述的并行合并。

我想我得到了工作的代码。它传递所有不太大的随机种子数组。但是,当我尝试合并两个大数组(每个数组 1e6 个元素)时,我得到以下终止:

terminate called without an active exception
terminate called recursively
terminate called recursively

使用 gdb 没有帮助:它在运行期间被损坏。

我很确定由于产生了太多线程而导致运行失败。

我能做些什么来确认这个错误是由于产生了太多的 std::threads 造成的?

笔记

  1. 代码工作到 n=1e4,失败 n=1e5
  2. 如果您想查看输出,请定义 DEBUG,但我不建议这样做,除非像 10 或 50 这样的小 n。

  3. STRBUF_SIZE/fprintf 的使用是丑陋的,但 iostream 在线程中不能很好地刷新 - 这是 hacky,但有效(无需在这里关注)。
  4. 我尝试通过在线程周围使用 try/catch 块来遵循 Barnes53 的建议,但这显然不起作用。
  5. 我知道产生大量线程是一件坏事——在这一点上,我只是试图实现书中的内容,看看它是否有效,也许会发现它的局限性。

更新

  1. GManNickG 在下面的回答有所帮助:并非每次运行,但在 1e5 的某些运行期间,我可以看到,资源确实消失了。
  2. 我可能会研究某种 k 路并行排序,如果该算法不可挽救,我可以在其中控制产生的线程数。

代码

#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>

#define STRBUF_SIZE 1024

class Random
{
public:
    Random( unsigned int seed=::time(nullptr))
        : m_seed( seed )
    { }
    // between [ 0 .. n-1 ]
    unsigned int rand_uint( unsigned int n )
    {
        return static_cast<unsigned int>
                     (static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
    }
    unsigned int getSeed() const { return m_seed; }
private:
    unsigned int m_seed;
};

template<typename T>
char* dump( char* line, T it1, T it2 )
{
    char buf[80];
    line[0] = '\0';
    for( T it=it1; it!=it2; ++it )
    {
        sprintf( buf, "%u ", *it );
        strcat(  line, buf );
    }
    return line;
}

template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
    auto low  = beg;
    auto high = std::max( beg, end );   // end+1
    while( low < high )
    {
        auto mid = low + std::distance( low, high ) / 2;
        if ( value <= *mid )
            high = mid;
        else
            low = mid + 1;
    }
    return high;
}

template< class InputIt, class OutputIt >
void p_merge( 
    char const*  msg, 
    unsigned     depth,
    unsigned     parent_lvl_id,
    unsigned     lr,
    InputIt  p1, InputIt  r1, 
    InputIt  p2, InputIt  r2, 
    OutputIt p3, OutputIt r3
    )
{
#ifdef DEBUG
    char buff[STRBUF_SIZE];
#endif
    unsigned sum_prev  = pow( 2, depth ) - 1;
    unsigned lvl_id    = 2*parent_lvl_id + lr;
    unsigned thread_no = sum_prev + lvl_id + 1;

    unsigned limit0    = sum_prev + 1;
    unsigned limit1    = pow( 2, depth+1 ) - 1;

#ifdef DEBUG
    char msg_dep[256];
    sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
    fprintf( stderr, "%s\n", msg_dep );
#endif

    if ( thread_no<limit0 || thread_no>limit1 )
    {
        fprintf( stderr, "OUT OF BOUNDS\n" );
        exit( 1 );
    }

    auto n1 = std::distance( p1, r1 );
    auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
    fprintf( stderr, "%s dist[v1]=%2ld   : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
    fprintf( stderr, "%s dist[v2]=%2ld   : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
    if ( n1<n2 )
    {
        std::swap( p1, p2 );
        std::swap( r1, r2 );
        std::swap( n1, n2 );
#ifdef DEBUG
      fprintf( stderr, "%s swapped[v1]   : %s\n", msg_dep, dump( buff, p1, r1 ));
      fprintf( stderr, "%s swapped[v2]   : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
    }
    if ( n1==0 )
    {
#ifdef DEBUG
      fprintf( stderr, "%s done              \n", msg_dep );
#endif
        return;
    }
    auto q1 = p1 + n1 / 2;   // midpoint
    auto q2 = binary_search_it( p2, r2, *q1 );  // <q1   q2[q1]   >=q1
    auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
    *q3 = *q1;

#ifdef DEBUG
    fprintf( stderr, "%s q1[median]=%u  : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
    fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
    fprintf( stderr, "%s q3(copied)=%u  : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif

#ifdef DEBUG
    auto d1 = std::distance( p1,   q1-1 );
    auto d2 = std::distance( q1+1, r1   );
    fprintf( stderr, "%s q1[dist_L]=%ld  : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
    fprintf( stderr, "%s q1[dist_M]=%ld  : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif


    try {
        std::thread t1{ 
            [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
        };
        std::thread t2{ 
            [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
        };
        t1.join();
        t2.join();
    }
    catch( ... )
    {
        fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
        exit( 1 );
    }

#ifdef DEBUG
    fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}

int
main( int argv, char* argc[] )
{
    // ok up to 1e4, fails by 1e5
    unsigned n = 1e5; 
    Random   r;
    std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );

#ifdef DEBUG
    fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif

    std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
    std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );

#ifdef DEBUG
    char buff[STRBUF_SIZE];
    fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
    fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif

    std::sort( v1.begin(), v1.end() );
    std::sort( v2.begin(), v2.end() );

    p_merge( "TOP ", 0, 0, 0,
        v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );

    assert( std::is_sorted( v3.begin(), v3.end() ));

#ifdef DEBUG
    fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}
4

2 回答 2

5

您可以捕获std::system_error并检查代码是否为resource_unavailable_try_again

#include <atomic>
#include <iostream>
#include <system_error>
#include <thread>
#include <vector>

class thread_collection
{
public:
    thread_collection() :
    mStop(false)
    {}

    ~thread_collection()
    {
        clear();
    }

    template <typename Func, typename... Args>
    bool add(Func&& func, Args&&... args)
    {
        try
        {
            mThreads.emplace_back(std::forward<Func>(func),
                                  std::cref(mStop),
                                  std::forward<Args>(args)...);
        }
        catch (const std::system_error& e)
        {
            if (e.code().value() == std::errc::resource_unavailable_try_again)
                return false; // not possible to make more threads right now
            else
                throw; // something else
        }

        return true; // can keep going
    }

    void clear()
    {
        mStop = true;
        for (auto& thread : mThreads)
        {
            if (thread.joinable())
                thread.join();
        }

        mThreads.clear();
        mStop = true;
    }

    std::size_t size() const
    {
        return mThreads.size();
    }

private:
    thread_collection(const thread_collection&);
    thread_collection& operator=(const thread_collection&);

    std::atomic<bool> mStop;
    std::vector<std::thread> mThreads;
};

void worker(const std::atomic<bool>& stop)
{
    while (!stop)
        std::this_thread::yield();
}

int main()
{
    thread_collection threads;

    try
    {
        while (threads.add(worker))
            continue;

        std::cout << "Exhausted thread resources!" << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cout << "Stopped for some other reason: " << e.what() << std::endl;
    }

    std::cout << "Made: " << threads.size() << " threads." << std::endl;
    threads.clear();
}

(运行此操作需您自担风险!)

根据§30.3.1.2/4,这是用于指示线程创建失败的错误代码:

错误条件:
resource_unavailable_try_again — 系统缺乏创建另一个线程所需的资源,或者系统对进程中的线程数施加的限制将被超过。

请注意,这可能是由于您自己的参数被复制到结果线程而引发的。为了保证不会发生这种情况,您需要预先构建您的参数,然后将它们不扔到您的线程函数中。

也就是说,无论如何你最好限制线程创建。运行的线程多于内核可以执行的线程是没有意义的。用于std::thread::hardware_concurrency获取该号码。

于 2013-01-18T23:06:57.277 回答
2
try {
    std::thread t1{ 
        [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
    };
    std::thread t2{ 
        [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
    };
    t1.join();
    t2.join();
}
catch( ... )
{
    fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
    exit( 1 );
}

此代码可能无法达到您的预期。如果构造t2引发异常,t1则将被销毁,但由于该线程是可连接的,它将调用std::terminate(),因此您catch将不会处理该异常。

您看到的一个原因terminate called recursively可能是许多线程同时遇到相同的问题,因此许多线程terminate()在相似的时间调用。

这将改为:

std::thread t1;
std::thread t2;

try {
    t1 = std::thread{ 
        [&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1,   p2, q2,   p3, r3 ); } 
    };
    t2 = std::thread{ 
        [&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); } 
    };
}
catch( ... )
{
    fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
    exit( 1 );
}

t1.join();
t2.join();

我不认为这是一个问题,因为唯一p_merge可以抛出的地方是在一个try块内,但你也应该知道,如果一个异常离开运行的函数,std::thread它也会调用std::terminate(),所以这不是你想要的那么你应该将noexcept函数(或只是非抛出函数)传递给std::thread.

于 2013-01-19T16:33:28.633 回答