1

我正在努力思考如何让线程相互发送信号。

我的设计:

main 函数创建一个主线程来协调一堆其他工作线程。main 函数还创建了工作线程,因为工作线程在 main 中编程的时间间隔产生和退出。主线程需要能够向这些工作线程发出信号并对它们进行信号广播,而工作线程必须向主线程发出信号(pthread_cond_signal)。由于每个线程都需要一个 pthread_mutex 和 pthread_cond 我用这些变量创建了一个 Worker 类和一个 Master 类。现在这就是我卡住的地方。C ++不允许您将成员函数作为 pthread_create(...) 处理程序传递,因此我必须在内部创建一个静态处理程序并将指向自身的指针传递给 reinterpret_cast 它以使用其类数据...

void Worker::start() {
pthread_create(&thread, NULL, &Worker::run, this);
}

void* Worker::run(void *ptr) {
Worker* data = reinterpret_cast<Worker*>(ptr);
}

我遇到的问题(可能是错误的)设置是,当我将一组工作指针传递给主线程时,它会发出一个不同的工作引用信号,因为我认为演员做了某种复制。所以我尝试了 static_cast 和相同的行为。

我只需要某种设计,Master 和 worker 可以相互 pthread_cond_wait(...) 和 pthread_cond_signal(...) 。

编辑 1

添加:

private:
    Worker(const Worker&);

还是行不通。

4

2 回答 2

2

编辑修复了所有版本中的潜在种族:

1./1 b使用 C++0x中概述的从 (mutex+condition+counter) 构建的信号量没有信号量?如何同步线程?
2.使用“反向”等待来确保信号得到预期工作人员的确认

我真的建议使用 c++11 风格<thread><condition_variable>实现这一点。

我有两个(半个)示威。他们每个人都假设您有 1 个 master 驱动 10 个 worker。每个工人在工作之前都会等待一个信号。

我们将使用std::condition_variable(与 a 一起工作std::mutex)来做信号。第一个版本和第二个版本之间的区别在于信号发送的方式:

  • 1. 通知任何工人,一次一个:
  • 1。使用工人结构
  • 2.通知所有线程,协调哪个receiver worker响应

1. 通知任何工人,一次一个:

这是最简单的做法,因为几乎没有协调:

#include <vector>
#include <thread>
#include <mutex>
#include <algorithm>
#include <iostream>
#include <condition_variable>

using namespace std;

class semaphore 
{ // see https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads
    std::mutex mx;
    std::condition_variable cv;
    unsigned long count;
public:
    semaphore() : count() {} 
    void notify();
    void wait();
};

static void run(int id, struct master& m);

struct master
{
    mutable semaphore sem;

    master()
    {
        for (int i = 0; i<10; ++i)
            threads.emplace_back(run, i, ref(*this));
    }

    ~master() {
        for(auto& th : threads) if (th.joinable()) th.join(); 
        std::cout << "done\n";
    }

    void drive()
    {
        // do wakeups
        for (unsigned i = 0; i<threads.size(); ++i)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%100));
            sem.notify();
        }
    }

  private:
    vector<thread> threads;
};

static void run(int id, master& m)
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

int main()
{
    master instance;
    instance.drive();
}

/// semaphore members
void semaphore::notify()
{
    lock_guard<mutex> lk(mx);
    ++count;
    cv.notify_one();
}

void semaphore::wait()
{
    unique_lock<mutex> lk(mx);
    while(!count)
        cv.wait(lk);
    --count;
}

1。使用工人结构

注意,如果你有一个非静态成员函数的worker类,你可以做同样的小修改:worker::run

struct worker
{
    worker(int id) : id(id) {}

    void run(master& m) const;

    int id;
};

// ...
struct master
{
    // ...

    master()
    {
        for (int i = 0; i<10; ++i)
            workers.emplace_back(i);

        for (auto& w: workers)
            threads.emplace_back(&worker::run, ref(w), ref(*this));
    }

// ...

void worker::run(master& m) const
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

一个警告

  • cv.wait()可能会遭受虚假唤醒,其中条件变量实际上没有被提高(例如,在操作系统信号处理程序的情况下)。这是任何平台上的条件变量都会发生的常见事情。

以下方法解决了这个问题:

2.通知所有线程,协调哪个receiver worker

使用标志来指示哪个线程打算接收信号:

struct master
{
    mutable mutex mx;
    mutable condition_variable cv;
    int signaled_id;               // ADDED

    master() : signaled_id(-1)
    {

让我们假设它driver变得更有趣,并希望以特定(随机...)顺序向所有工作人员发出信号:

    void drive()
    {
        // generate random wakeup order
        vector<int> wakeups(10);
        iota(begin(wakeups), end(wakeups), 0);
        random_shuffle(begin(wakeups), end(wakeups));

        // do wakeups
        for (int id : wakeups)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%1000));
            signal(id);
        }
    }

  private:
    void signal(int id)                // ADDED id
    {
        unique_lock<mutex> lk(mx);

        std::cout << "signaling " << id << "\n";

        signaled_id = id;              // ADDED put it in the shared field
        cv.notify_all();

        cv.wait(lk, [&] { return signaled_id == -1; });
    }

现在我们要做的就是确保接收线程检查它的 id 是否匹配:

m.cv.wait(lk, [&] { return m.signaled_id == id; });
m.signaled_id = -1;
m.cv.notify_all();

这结束了虚假唤醒。

完整的代码清单/现场演示:

于 2013-07-20T00:02:26.520 回答
0

目前尚不清楚您的确切情况是什么,但似乎您正在使用容器来保存在 中创建的“工人”实例,main并将它们传递给您的“主人”。如果是这种情况,您可以使用一些补救措施。您需要选择一个适合您的实施的方案。

  • 将对容器的引用传递给mainMaster。
  • 更改容器以保存(智能)指向工人的指针。
  • 使容器成为“Master”本身的一部分,这样就不需要传递给它了。
  • 为您的 Worker 类实现适当的析构函数、复制构造函数和赋值运算符(换句话说,遵守三法则)。

从技术上讲,由于pthread_create()是 C API,因此传递给它的函数指针需要具有 C 链接 ( extern "C")。您不能使 C++ 类的方法具有 C 链接,因此您应该定义一个外部函数:

extern "C" { static void * worker_run (void *arg); }

class Worker { //...
};

static void * worker_run (void *arg) {
    return Worker::run(arg);
}
于 2013-07-19T22:43:01.240 回答