1

我已经实现了一个锁定的哈希表,并创建了一个相同的单元测试来测试对表的并发读/写。在使用 clang (AppleClang) 进行线程清理后,它没有报告任何错误,但另一方面 GCC (g++8) 会溢出我认为是误报的行。或者他们可能是真的吗?我已经用 clang 对代码进行了多次测试,但找不到逻辑错误。

锁定查找表代码:


template <typename Key, typename Value, typename Hash = std::hash<Key>>
class LockedLookupTable
{
    class bucket_type;
public:
    typedef Key key_type;
    typedef Value mapped_type;
    typedef Hash hash_type;

    LockedLookupTable(unsigned num_buckets = 19, const Hash &hasher_ = Hash())
    : _buckets(num_buckets), hasher(hasher_)
    {
        for (unsigned i = 0; i < num_buckets; ++i)
            _buckets[i].reset(new bucket_type);
    }

    LockedLookupTable(const LockedLookupTable &other) = delete;
    LockedLookupTable &operator=(const LockedLookupTable &other) = delete;

    Value at(Key const &key, Value const &default_value = Value()) const
    {
        return get_bucket(key).at(key, default_value);
    }

    void insert(const Key &key, const Value &value)
    {
        get_bucket(key).insert(key, value);
    }

    void erase(const Key &key)
    {
        get_bucket(key).erase(key);
    }

    std::map<Key, Value> get_map() const
    {
        std::map<Key, Value> res;

        for (unsigned i = 0; i < _buckets.size(); ++i) {
            boost::unique_lock<boost::shared_mutex> lock(_buckets[i]->_mutex);
            for (typename bucket_type::bucket_iterator it = _buckets[i]->data.begin();
                 it != _buckets[i]->data.end();
                 ++it) {
                res.insert(*it);
            }
        }

        return res;
    }

    int max_collisions() { return _buckets[0]->data.size() ? _buckets[0]->data.size() - 1 : 0; }

    std::size_t size()
    {
        std::size_t count = 0;

        for (unsigned i = 0; i < _buckets.size(); ++i) {
            boost::unique_lock<boost::shared_mutex> lock(_buckets[i]->_mutex);
            for (typename bucket_type::bucket_iterator it = _buckets[i]->data.begin();
                 it != _buckets[i]->data.end();
                 ++it) {
                count++;
            }
        }

        return count;
    }
private:
    std::vector<std::unique_ptr<bucket_type>> _buckets;
    Hash hasher;

    bucket_type &get_bucket(Key const &key) const
    {
        const std::size_t bucket_index = hasher(key) % _buckets.size();
        return *_buckets[bucket_index];
    }

    class bucket_type
    {
        typedef std::pair<Key, Value> bucket_value;
        typedef std::vector<bucket_value> bucket_data;

    public:
        typedef typename bucket_data::iterator bucket_iterator;

        Value at(Key const &key, Value const &default_value)
        {
            boost::shared_lock<boost::shared_mutex> lock(_mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            return (found_entry == data.end()) ? default_value : found_entry->second;
        }

        void insert(Key const &key, Value const &value)
        {
            boost::unique_lock<boost::shared_mutex> lock(_mutex);
            bucket_iterator found_entry = find_entry_for(key);

            if (found_entry != data.end())
                data.erase(found_entry);

            data.push_back(bucket_value(key, value));
        }

        void erase(Key const &key)
        {
            boost::unique_lock<boost::shared_mutex> lock(_mutex);
            bucket_iterator const found_entry = find_entry_for(key);
            if (found_entry != data.end())
                data.erase(found_entry);
        }

        bucket_data data;
        mutable boost::shared_mutex _mutex;

    private:
        bucket_iterator find_entry_for(Key const &key)
        {
            return std::find_if(data.begin(), data.end(),
                [&](bucket_value const &item)
                {
                    return item.first == key;
                });
        }
    };
};

测试代码:


BOOST_AUTO_TEST_CASE(LockedLookupTableTest)
{
    clock_t int_begin = clock();
    LockedLookupTable<int, int> table(MAX_BUCKETS);
    std::atomic<bool> go{false}, done_read{false}, done_write{false}, fail{false};
    std::thread *r[MAX_THREADS], *w[MAX_THREADS];

#if SPLIT_THREADS
    printf("Dividing work between %d reader + %d writer threads: %d per thread.\n", MAX_THREADS, MAX_THREADS, (MAX_TABLE_SIZE/MAX_THREADS));
#endif

    // Populate for reading
    for (int j = 0; j < MAX_TABLE_SIZE; j++)
        table.insert(j, j);

    // Readers
    for (int i = 0; i < MAX_THREADS; i++) {
        r[i] = new std::thread([&go, &table, &done_read, &fail, i] () {
#if SPLIT_THREADS
            int start = MAX_TABLE_SIZE / MAX_THREADS * i;
            int end = start + MAX_TABLE_SIZE / MAX_THREADS;
#else
            int start = 0;
            int end = MAX_TABLE_SIZE;
#endif
            while (!go);

            clock_t t_begin = clock();

            for (int j = start; j < end; j++) {
                int val = table.at(j);
                if (j != val) {
                    char output[256];
                    sprintf(output, "INT/INT not equal: %d != %d\n", j, val);
                    done_read = true;
                    fail = true;
                    return;
                }
            }

            done_read = true;
            printf("INT/INT Thread #%d - Read Time %.5fs\n", i, double(clock() - t_begin) / CLOCKS_PER_SEC);
        });
    }

    // Writers
    for (int i = 0; i < MAX_THREADS; i++) {
        w[i] = new std::thread([&go, &table, &done_write, i] () {
#if SPLIT_THREADS
            int start = MAX_TABLE_SIZE / MAX_THREADS * i;
            int end = start + MAX_TABLE_SIZE / MAX_THREADS;
#else
            int start = 0;
            int end = MAX_TABLE_SIZE;
#endif
            while (!go);

            clock_t t_begin = clock();

            for (int j = start; j < end; j++)
                table.insert(j, j);

            done_write = true;
            printf("INT/INT Thread #%d - Write Time %.5fs\n", i, double(clock() - t_begin) / CLOCKS_PER_SEC);
        });
    }

    go.exchange(true);

    while (table.size() < MAX_TABLE_SIZE && !done_read && !done_write);

    if (fail)
        BOOST_FAIL("Failed read.");

    printf("INT/INT Read/Write Test Finished in %0.5fs\n", double(clock() - int_begin) / CLOCKS_PER_SEC);

    int_begin = clock();

    // Map Iteration
    std::map<int, int> map = table.get_map();
    int i = 0;
    for (auto c : map) {
        BOOST_CHECK_EQUAL(i, c.second);
        i++;
    }

    printf("INT/INT Map Iteration Test Finished in %0.5fs\n", double(clock() - int_begin) / CLOCKS_PER_SEC);

    // Final Integrity Check
    for (int j = 0; j < MAX_TABLE_SIZE; j++)
        BOOST_CHECK_EQUAL(j, table.at(j));

    printf("INT/INT Finished in %0.5fs\n", double(clock() - int_begin) / CLOCKS_PER_SEC);

    for (int i = 0; i < MAX_THREADS; i++) {
        if (r[i]->joinable()) r[i]->join();
        if (w[i]->joinable()) w[i]->join();
        delete r[i];
        delete w[i];
    }

    done_read.exchange(false);
    done_write.exchange(false);
    go.exchange(false);

    // Erase table contents.
    for (int i = 0; i < MAX_THREADS; i++) {
        w[i] = new std::thread([&go, &table, i] () {
#if SPLIT_THREADS
            int start = MAX_TABLE_SIZE / MAX_THREADS * i;
            int end = start + MAX_TABLE_SIZE / MAX_THREADS;
#else
            int start = 0;
            int end = MAX_TABLE_SIZE;
#endif
            while (!go);

            clock_t t_begin = clock();

            for (int j = start; j < end; j++)
                table.erase(j);

            printf("INT/INT Thread #%d - Erase Time %.5fs\n", i, double(clock() - t_begin) / CLOCKS_PER_SEC);
        });
    }

    go.exchange(true);

    while (table.size() > 0);

    printf("INT/INT Collisions: %d\n", table.max_collisions());

    for (int i = 0; i < MAX_THREADS; i++) {
        if (w[i]->joinable()) w[i]->join();
        delete w[i];
    }
}

GCC-8 TSan 错误:https ://travis-ci.org/horizo​​nxyz/horizo​​n/jobs/507336127#L1424

AppleClang 错误:https ://travis-ci.org/horizo​​nxyz/horizo​​n/jobs/507336131#L2950

4

0 回答 0