4

我正在清理一个大型项目中的 ThreadSanitizer 警告。特别是在这种确切的情况下,有一个从文件生产者读取的衍生线程。然后有一个或多个解压线程作为线程池的一部分。最后,有一个线程实际上通过检索解压缩块来进行处理。这当然允许同时解压缩多个块。

项目通过原子 bool 和 同步的位置有很多usleep(),尤其包括这个。当然,这并不理想,这是分配给我的事情之一。

但只要表示互斥锁和锁,我看不出 ThreadSanitizer 会抱怨什么问题(除了与使用条件变量相比可能会降低效率)。

ThreadSanitizer 抱怨数据竞争“好像通过睡眠同步”,并提供了usleep()调用位置。我的问题是,当然通过睡眠进行同步并不理想,但只要尊重互斥锁,我就不会看到数据竞争。据我所知,互斥锁的工作方式,我相信它们受到尊重。

因此,我试图创建一组最小的复制步骤,以便准确识别 ThreadSanitizer 所抱怨的内容。

这是我想出的代码:

// g++ --std=c++11 -lpthread as-if-synchronized-via-sleep.cpp -g -fsanitize=thread -pie -fPIC
#include <iostream>
#include <iomanip>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <chrono>
#include <cstdlib>

#include <unistd.h>

class Data {
public:
        char uncompressed[1024];
        char compressed[1024];
        std::atomic<bool> done = {false};
};

int main(int argc, char **argv) {
        std::atomic<bool> done = {false};
        std::atomic<int> count = {0};

        std::mutex m;
        std::vector<Data*> v;

        std::thread provider{[&](){
                while ( not done ) {
                        while ( 100 < count ) {
                                usleep(20);
                                if ( done ) {
                                        break;
                                }
                        }
                        Data *data = new Data();
                        // Simulate reading in compressed data
                        for ( size_t i = 0; i < sizeof(data->uncompressed); ++i )
                                data->uncompressed[i] = char(rand());
                        std::unique_lock<std::mutex> l(m);
                        v.push_back(data);
                        l.unlock();
                        ++count;
                }
                return;
        }};

        std::thread decompressor{[&](){
                while ( not done ) {
                        while ( 0 == count ) {
                                usleep(20);
                                if ( done ) {
                                        break;
                                }
                        }
                        std::unique_lock<std::mutex> l(m);
                        if ( v.empty() ) {
                                continue;
                        }
                        Data* data = v.front();
                        // Simulate decompressing it.
                        for ( size_t i = 0; i < sizeof(data->compressed); ++i )
                                data->compressed[i] = data->uncompressed[i] ^ char(0xff);
                        data->done = true;
                }
        }};

        std::thread consumer{[&](){
                while ( not done ) {
                        std::unique_lock<std::mutex> l(m);
                        while ( not v.empty() ) {
                                if ( done ) {
                                        break;
                                }
                                if ( v.front()->done ) {
                                        break;
                                }
                                l.unlock();
                                usleep(20);
                                l.lock();
                        }
                        if ( done ) {
                                break;
                        }
                        // Simulate consuming it
                        Data* data = v.front();
                        v.erase(v.begin());
                        l.unlock();
                        // Pretend we're doing stuff with decompressed data
                        std::vector<char> vv(1,1);
                        for ( std::size_t i = 0; i < sizeof(data->uncompressed); ++i ) {
                                if ( data->uncompressed[i] )
                                        vv.back() += data->uncompressed[i];
                                else
                                        vv.emplace_back();
                        }
                        delete data;
                        size_t n = 0;
                        for ( auto c : vv ) {
                                std::cout << std::hex << std::setw(2) << (int)c;
                        }
                }
        }};
        std::cout << "Main sleeping" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        done = true;
        std::cout << "Main joining..." << std::endl;
        provider.join();
        decompressor.join();
        consumer.join();

        std::cout << "Cleaning up" << std::endl;
        std::unique_lock<std::mutex> l(m);
        while ( not v.empty() ) {
                delete v.back();
                v.erase(std::next(v.rbegin()).base());
        }

        std::cout << "Done!" << std::endl;
        return 0;
}

使用 GCC 4.9.1 编译的代码如第一条注释所示,我在执行结束时从 ThreadSanitizer 收到以下消息:

Main sleeping
4f72ffffffa8ffffffb9Main joining...
==================
WARNING: ThreadSanitizer: data race (pid=64255)
  Write of size 8 at 0x7d100000dfc8 by thread T3:
    #0 operator delete(void*) /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:592 (libtsan.so.0+0x000000049480)
    #1 deallocate /home/keithb/include/c++/4.9.1/ext/new_allocator.h:110 (a.out+0x000000004ec9)
    #2 deallocate /home/keithb/include/c++/4.9.1/bits/alloc_traits.h:383 (a.out+0x000000004b98)
    #3 _M_destroy /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:535 (a.out+0x000000005fbc)
    #4 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:166 (libstdc++.so.6+0x0000000b5b30)
    #5 ~__shared_count /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:666 (libstdc++.so.6+0x0000000b5b30)
    #6 ~__shared_ptr /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr_base.h:914 (libstdc++.so.6+0x0000000b5b30)
    #7 ~shared_ptr /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/bits/shared_ptr.h:93 (libstdc++.so.6+0x0000000b5b30)
    #8 execute_native_thread_routine /home/keithb/dev/gcc/gcc_4_9_1_release/libstdc++-v3/src/c++11/thread.cc:95 (libstdc++.so.6+0x0000000b5b30)

  Previous atomic write of size 4 at 0x7d100000dfc8 by main thread:
    #0 __tsan_atomic32_fetch_add /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interface_atomic.cc:468 (libtsan.so.0+0x0000000206ce)
    #1 __exchange_and_add /home/keithb/include/c++/4.9.1/ext/atomicity.h:49 (a.out+0x0000000021f4)
    #2 __exchange_and_add_dispatch /home/keithb/include/c++/4.9.1/ext/atomicity.h:82 (a.out+0x0000000022ab)
    #3 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:146 (a.out+0x000000007e99)
    #4 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:666 (a.out+0x000000007346)
    #5 std::__shared_ptr<std::thread::_Impl_base, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:914 (a.out+0x000000007019)
    #6 std::shared_ptr<std::thread::_Impl_base>::~shared_ptr() /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:93 (a.out+0x000000007045)
    #7 thread<main(int, char**)::<lambda()> > /home/keithb/include/c++/4.9.1/thread:135 (a.out+0x0000000031eb)
    #8 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)

  Location is heap block of size 64 at 0x7d100000dfc0 allocated by main thread:
    #0 operator new(unsigned long) /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:560 (libtsan.so.0+0x0000000496c2)
    #1 allocate /home/keithb/include/c++/4.9.1/ext/new_allocator.h:104 (a.out+0x000000004e28)
    #2 allocate /home/keithb/include/c++/4.9.1/bits/alloc_traits.h:357 (a.out+0x000000004aed)
    #3 __shared_count<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:616 (a.out+0x000000004735)
    #4 __shared_ptr<std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr_base.h:1090 (a.out+0x00000000443f)
    #5 shared_ptr<std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:316 (a.out+0x00000000425f)
    #6 allocate_shared<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::allocator<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> > >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:588 (a.out+0x000000004118)
    #7 make_shared<std::thread::_Impl<std::_Bind_simple<main(int, char**)::<lambda()>()> >, std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/bits/shared_ptr.h:604 (a.out+0x000000003e74)
    #8 _M_make_routine<std::_Bind_simple<main(int, char**)::<lambda()>()> > /home/keithb/include/c++/4.9.1/thread:193 (a.out+0x000000003925)
    #9 thread<main(int, char**)::<lambda()> > /home/keithb/include/c++/4.9.1/thread:135 (a.out+0x0000000031b9)
    #10 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)

  Thread T3 (tid=64259, running) created by main thread at:
    #0 pthread_create /home/keithb/dev/gcc/gcc_4_9_1_release/libsanitizer/tsan/tsan_interceptors.cc:877 (libtsan.so.0+0x000000047bf3)
    #1 __gthread_create /home/keithb/dev/gcc/gcc_4_9_1_build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:662 (libstdc++.so.6+0x0000000b5be0)
    #2 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>) /home/keithb/dev/gcc/gcc_4_9_1_release/libstdc++-v3/src/c++11/thread.cc:142 (libstdc++.so.6+0x0000000b5be0)
    #3 main /home/keithb/dev/mytest/thread-sanitizer-checks/main.cpp:102 (a.out+0x000000002bc9)

SUMMARY: ThreadSanitizer: data race /home/keithb/include/c++/4.9.1/ext/new_allocator.h:110 deallocate
==================
Cleaning up
Done!
ThreadSanitizer: reported 1 warnings

在所有线程都加入后,tsan 最终抱怨“好像通过睡眠同步”,这似乎是 lambda 的析构函数,特别是共享指针。由于这是 c++11 的一项功能,而且谷歌提供了大量证据表明 ThreadSanitizer 不太喜欢 C++11,我怀疑这本身并不是一场“真正的”比赛。

虽然......事实上,lambda 被用作线程的入口点......我想知道这是否是一个真正的竞赛,其中线程在 lamda 被销毁之前还没有完全退出?那将是编译器错误的领域,所以我不想进一步调查(范围蠕变等)。

所以我的问题(我知道它们有点开放,请帮我缩小范围?)

在这种情况下,为什么 ThreadSanitizer 不会在生产者、解压缩器和消费者中检测到“好像通过睡眠同步”的数据竞争?而且,是否有可能(并且您可以提供示例)将此代码修改为...

A) ...产生一个真正的数据竞争,它会发出一个不会“通常”导致崩溃的 ThreadSanitizer 警告(相关软件的当前生产版本似乎不会因为这种假定的数据竞争而崩溃)?

B) ...产生一个真正的数据竞赛,乍一看似乎不是数据竞赛,但知识和/或经验表明并非如此

C) ...从 ThreadSanitizer 产生误报,但实际上没有数据竞争。

4

1 回答 1

1

我认为 ThreadSanitizer 正在看到睡眠,它警告您,当使用睡眠而不是使用互斥锁或条件变量执行同步时,它检测到的模式很常见。请参阅https://github.com/google/sanitizers/wiki/ThreadSanitizerReportFormat

您应该将代码移动到使用条件变量。

于 2021-12-02T23:18:17.890 回答