我正在清理一个大型项目中的 ThreadSanitizer 警告。特别是在这种确切的情况下,有一个从文件生产者读取的衍生线程。然后有一个或多个解压线程作为线程池的一部分。最后,有一个线程实际上通过检索解压缩块来进行处理。这当然允许同时解压缩多个块。
项目通过原子 bool 和 同步的位置有很多usleep()
,尤其包括这个。当然,这并不理想,这是分配给我的事情之一。
但只要表示互斥锁和锁,我看不出 ThreadSanitizer 会抱怨什么问题(除了与使用条件变量相比可能会降低效率)。
ThreadSanitizer 抱怨数据竞争“好像通过睡眠同步”,并提供了usleep()
调用位置。我的问题是,当然通过睡眠进行同步并不理想,但只要尊重互斥锁,我就不会看到数据竞争。据我所知,互斥锁的工作方式,我相信它们受到尊重。
因此,我试图创建一组最小的复制步骤,以便准确识别 ThreadSanitizer 所抱怨的内容。
这是我想出的代码:
// g++ --std=c++11 -lpthread as-if-synchronized-via-sleep.cpp -g -fsanitize=thread -pie -fPIC
#include <iostream>
#include <iomanip>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <chrono>
#include <cstdlib>
#include <unistd.h>
class Data {
public:
char uncompressed[1024];
char compressed[1024];
std::atomic<bool> done = {false};
};
int main(int argc, char **argv) {
std::atomic<bool> done = {false};
std::atomic<int> count = {0};
std::mutex m;
std::vector<Data*> v;
std::thread provider{[&](){
while ( not done ) {
while ( 100 < count ) {
usleep(20);
if ( done ) {
break;
}
}
Data *data = new Data();
// Simulate reading in compressed data
for ( size_t i = 0; i < sizeof(data->uncompressed); ++i )
data->uncompressed[i] = char(rand());
std::unique_lock<std::mutex> l(m);
v.push_back(data);
l.unlock();
++count;
}
return;
}};
std::thread decompressor{[&](){
while ( not done ) {
while ( 0 == count ) {
usleep(20);
if ( done ) {
break;
}
}
std::unique_lock<std::mutex> l(m);
if ( v.empty() ) {
continue;
}
Data* data = v.front();
// Simulate decompressing it.
for ( size_t i = 0; i < sizeof(data->compressed); ++i )
data->compressed[i] = data->uncompressed[i] ^ char(0xff);
data->done = true;
}
}};
std::thread consumer{[&](){
while ( not done ) {
std::unique_lock<std::mutex> l(m);
while ( not v.empty() ) {
if ( done ) {
break;
}
if ( v.front()->done ) {
break;
}
l.unlock();
usleep(20);
l.lock();
}
if ( done ) {
break;
}
// Simulate consuming it
Data* data = v.front();
v.erase(v.begin());
l.unlock();
// Pretend we're doing stuff with decompressed data
std::vector<char> vv(1,1);
for ( std::size_t i = 0; i < sizeof(data->uncompressed); ++i ) {
if ( data->uncompressed[i] )
vv.back() += data->uncompressed[i];
else
vv.emplace_back();
}
delete data;
size_t n = 0;
for ( auto c : vv ) {
std::cout << std::hex << std::setw(2) << (int)c;
}
}
}};
std::cout << "Main sleeping" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
done = true;
std::cout << "Main joining..." << std::endl;
provider.join();
decompressor.join();
consumer.join();
std::cout << "Cleaning up" << std::endl;
std::unique_lock<std::mutex> l(m);
while ( not v.empty() ) {
delete v.back();
v.erase(std::next(v.rbegin()).base());
}
std::cout << "Done!" << std::endl;
return 0;
}
使用 GCC 4.9.1 编译的代码如第一条注释所示,我在执行结束时从 ThreadSanitizer 收到以下消息:
Main sleeping
4f72ffffffa8ffffffb9Main joining...
==================
WARNING: ThreadSanitizer: data race (pid=64255)
Write of size 8 at 0x7d100000dfc8 by thread T3:
#0 operator delete(void*) /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:592 (libtsan.so.0+0x000000049480)
#1 deallocate /home/keithb/include/c++/4.9.1/ext/new_allocator.h:110 (a.out+0x000000004ec9)
#2 deallocate /home/keithb/include/c++/4.9.1/bits/alloc_traits.h:383 (a.out+0x000000004b98)
#3 _M_destroy /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:535 (a.out+0x000000005fbc)
#4 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:166 (libstdc++.so.6+0x0000000b5b30)
#5 ~__shared_count /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:666 (libstdc++.so.6+0x0000000b5b30)
#6 ~__shared_ptr /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:914 (libstdc++.so.6+0x0000000b5b30)
#7 ~shared_ptr /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr.h:93 (libstdc++.so.6+0x0000000b5b30)
#8 execute_native_thread_routine /home/keithb/dev/gcc/gcc_4_9_1_release/libstdc++-v3/src/c++11/thread.cc:95 (libstdc++.so.6+0x0000000b5b30)
Previous atomic write of size 4 at 0x7d100000dfc8 by main thread:
#0 __tsan_atomic32_fetch_add /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interface_atomic.cc:468 (libtsan.so.0+0x0000000206ce)
#1 __exchange_and_add /home/keithb/include/c++/4.9.1/ext/atomicity.h:49 (a.out+0x0000000021f4)
#2 __exchange_and_add_dispatch /home/keithb/include/c++/4.9.1/ext/atomicity.h:82 (a.out+0x0000000022ab)
#3 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:146 (a.out+0x000000007e99)
#4 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:666 (a.out+0x000000007346)
#5 std::__shared_ptr<std::thread::_Impl_base, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:914 (a.out+0x000000007019)
#6 std::shared_ptr<std::thread::_Impl_base>::~shared_ptr() /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:93 (a.out+0x000000007045)
#7 thread<main(int, char**)::<lambda()> > /home/keithb/include/c++/4.9.1/thread:135 (a.out+0x0000000031eb)
#8 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)
Location is heap block of size 64 at 0x7d100000dfc0 allocated by main thread:
#0 operator new(unsigned long) /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:560 (libtsan.so.0+0x0000000496c2)
#1 allocate /home/keithb/include/c++/4.9.1/ext/new_allocator.h:104 (a.out+0x000000004e28)
#2 allocate /home/keithb/include/c++/4.9.1/bits/alloc_traits.h:357 (a.out+0x000000004aed)
#3 __shared_count<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:616 (a.out+0x000000004735)
#4 __shared_ptr<std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:1090 (a.out+0x00000000443f)
#5 shared_ptr<std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:316 (a.out+0x00000000425f)
#6 allocate_shared<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:588 (a.out+0x000000004118)
#7 make_shared<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:604 (a.out+0x000000003e74)
#8 _M_make_routine<std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/thread:193 (a.out+0x000000003925)
#9 thread<main(int, char**)::<lambda()> > /home/keithb/include/c++/4.9.1/thread:135 (a.out+0x0000000031b9)
#10 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)
Thread T3 (tid=64259, running) created by main thread at:
#0 pthread_create /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:877 (libtsan.so.0+0x000000047bf3)
#1 __gthread_create /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:662 (libstdc++.so.6+0x0000000b5be0)
#2 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>) /home/keithb/dev/gcc/gcc_4_9_1_release/libstdc++-v3/src/c++11/thread.cc:142 (libstdc++.so.6+0x0000000b5be0)
#3 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)
SUMMARY: ThreadSanitizer: data race /home/keithb/include/c++/4.9.1/ext/new_allocator.h:110 deallocate
==================
Cleaning up
Done!
ThreadSanitizer: reported 1 warnings
在所有线程都加入后,tsan 最终抱怨“好像通过睡眠同步”,这似乎是 lambda 的析构函数,特别是共享指针。由于这是 c++11 的一项功能,而且谷歌提供了大量证据表明 ThreadSanitizer 不太喜欢 C++11,我怀疑这本身并不是一场“真正的”比赛。
虽然......事实上,lambda 被用作线程的入口点......我想知道这是否是一个真正的竞赛,其中线程在 lamda 被销毁之前还没有完全退出?那将是编译器错误的领域,所以我不想进一步调查(范围蠕变等)。
所以我的问题(我知道它们有点开放,请帮我缩小范围?)
在这种情况下,为什么 ThreadSanitizer 不会在生产者、解压缩器和消费者中检测到“好像通过睡眠同步”的数据竞争?而且,是否有可能(并且您可以提供示例)将此代码修改为...
A) ...产生一个真正的数据竞争,它会发出一个不会“通常”导致崩溃的 ThreadSanitizer 警告(相关软件的当前生产版本似乎不会因为这种假定的数据竞争而崩溃)?
B) ...产生一个真正的数据竞赛,乍一看似乎不是数据竞赛,但知识和/或经验表明并非如此?
C) ...从 ThreadSanitizer 产生误报,但实际上没有数据竞争。