我使用std::thread
gcc 作为我的编译器来实现Cormen 的算法简介中所述的并行合并。
我想我得到了工作的代码。它传递所有不太大的随机种子数组。但是,当我尝试合并两个大数组(每个数组 1e6 个元素)时,我得到以下终止:
terminate called without an active exception
terminate called recursively
terminate called recursively
使用 gdb 没有帮助:它在运行期间被损坏。
我很确定由于产生了太多线程而导致运行失败。
我能做些什么来确认这个错误是由于产生了太多的 std::threads 造成的?
笔记
- 代码工作到 n=1e4,失败 n=1e5
如果您想查看输出,请定义 DEBUG,但我不建议这样做,除非像 10 或 50 这样的小 n。
- STRBUF_SIZE/fprintf 的使用是丑陋的,但 iostream 在线程中不能很好地刷新 - 这是 hacky,但有效(无需在这里关注)。
- 我尝试通过在线程周围使用 try/catch 块来遵循 Barnes53 的建议,但这显然不起作用。
- 我知道产生大量线程是一件坏事——在这一点上,我只是试图实现书中的内容,看看它是否有效,也许会发现它的局限性。
更新
- GManNickG 在下面的回答有所帮助:并非每次运行,但在 1e5 的某些运行期间,我可以看到,资源确实消失了。
- 我可能会研究某种 k 路并行排序,如果该算法不可挽救,我可以在其中控制产生的线程数。
代码
#include <vector>
#include <iostream>
#include <algorithm>
#include <vector>
#include <thread>
#include <cmath>
#include <cstring>
#include <cassert>
#define STRBUF_SIZE 1024
class Random
{
public:
Random( unsigned int seed=::time(nullptr))
: m_seed( seed )
{ }
// between [ 0 .. n-1 ]
unsigned int rand_uint( unsigned int n )
{
return static_cast<unsigned int>
(static_cast<float>(n) * rand_r( &m_seed ) / RAND_MAX);
}
unsigned int getSeed() const { return m_seed; }
private:
unsigned int m_seed;
};
template<typename T>
char* dump( char* line, T it1, T it2 )
{
char buf[80];
line[0] = '\0';
for( T it=it1; it!=it2; ++it )
{
sprintf( buf, "%u ", *it );
strcat( line, buf );
}
return line;
}
template< typename T, class It >
It binary_search_it( It beg, It end, const T& value )
{
auto low = beg;
auto high = std::max( beg, end ); // end+1
while( low < high )
{
auto mid = low + std::distance( low, high ) / 2;
if ( value <= *mid )
high = mid;
else
low = mid + 1;
}
return high;
}
template< class InputIt, class OutputIt >
void p_merge(
char const* msg,
unsigned depth,
unsigned parent_lvl_id,
unsigned lr,
InputIt p1, InputIt r1,
InputIt p2, InputIt r2,
OutputIt p3, OutputIt r3
)
{
#ifdef DEBUG
char buff[STRBUF_SIZE];
#endif
unsigned sum_prev = pow( 2, depth ) - 1;
unsigned lvl_id = 2*parent_lvl_id + lr;
unsigned thread_no = sum_prev + lvl_id + 1;
unsigned limit0 = sum_prev + 1;
unsigned limit1 = pow( 2, depth+1 ) - 1;
#ifdef DEBUG
char msg_dep[256];
sprintf( msg_dep, "%s [%2d] %-10d [%d,%d]", msg, depth, thread_no, limit0, limit1 );
fprintf( stderr, "%s\n", msg_dep );
#endif
if ( thread_no<limit0 || thread_no>limit1 )
{
fprintf( stderr, "OUT OF BOUNDS\n" );
exit( 1 );
}
auto n1 = std::distance( p1, r1 );
auto n2 = std::distance( p2, r2 );
#ifdef DEBUG
fprintf( stderr, "%s dist[v1]=%2ld : %s\n", msg_dep, n1, dump( buff, p1, r1 ) );
fprintf( stderr, "%s dist[v2]=%2ld : %s\n", msg_dep, n2, dump( buff, p2, r2 ) );
#endif
if ( n1<n2 )
{
std::swap( p1, p2 );
std::swap( r1, r2 );
std::swap( n1, n2 );
#ifdef DEBUG
fprintf( stderr, "%s swapped[v1] : %s\n", msg_dep, dump( buff, p1, r1 ));
fprintf( stderr, "%s swapped[v2] : %s\n", msg_dep, dump( buff, p2, r2 ));
#endif
}
if ( n1==0 )
{
#ifdef DEBUG
fprintf( stderr, "%s done \n", msg_dep );
#endif
return;
}
auto q1 = p1 + n1 / 2; // midpoint
auto q2 = binary_search_it( p2, r2, *q1 ); // <q1 q2[q1] >=q1
auto q3 = p3 + std::distance( p1, q1 ) + std::distance( p2, q2 );
*q3 = *q1;
#ifdef DEBUG
fprintf( stderr, "%s q1[median]=%u : %s\n", msg_dep, *q1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q2[fulcrum]=%u : %s\n", msg_dep, *q2, dump( buff, p2, r2 ));
fprintf( stderr, "%s q3(copied)=%u : %s\n", msg_dep, *q3, dump( buff, p3, r3 ));
#endif
#ifdef DEBUG
auto d1 = std::distance( p1, q1-1 );
auto d2 = std::distance( q1+1, r1 );
fprintf( stderr, "%s q1[dist_L]=%ld : %s\n", msg_dep, d1, dump( buff, p1, r1 ));
fprintf( stderr, "%s q1[dist_M]=%ld : %s\n", msg_dep, d2, dump( buff, p1, r1 ));
#endif
try {
std::thread t1{
[&](){ p_merge( "LESS", depth+1, lvl_id, 0, p1, q1, p2, q2, p3, r3 ); }
};
std::thread t2{
[&](){ p_merge( "MORE", depth+1, lvl_id, 1, q1+1, r1, q2, r2, q3+1, r3 ); }
};
t1.join();
t2.join();
}
catch( ... )
{
fprintf( stderr, "OK - I am dying during a std::thread spawn\n" );
exit( 1 );
}
#ifdef DEBUG
fprintf( stderr, "%s synchronized\n", msg_dep );
#endif
}
int
main( int argv, char* argc[] )
{
// ok up to 1e4, fails by 1e5
unsigned n = 1e5;
Random r;
std::vector<unsigned> v1( n ), v2( n ), v3( 2 * n );
#ifdef DEBUG
fprintf( stderr, "SEED = %u\n", r.getSeed() );
#endif
std::generate( v1.begin(), v1.end(), [&]() { return r.rand_uint(n); } );
std::generate( v2.begin(), v2.end(), [&]() { return r.rand_uint(n); } );
#ifdef DEBUG
char buff[STRBUF_SIZE];
fprintf( stderr, "%s\n", dump( buff, v1.begin(), v1.end() ));
fprintf( stderr, "%s\n", dump( buff, v2.begin(), v2.end() ));
#endif
std::sort( v1.begin(), v1.end() );
std::sort( v2.begin(), v2.end() );
p_merge( "TOP ", 0, 0, 0,
v1.begin(), v1.end(), v2.begin(), v2.end(), v3.begin(), v3.end() );
assert( std::is_sorted( v3.begin(), v3.end() ));
#ifdef DEBUG
fprintf( stderr, "FINAL : %s\n", dump( buff, v3.begin(), v3.end() ));
#endif
}