3

我的程序有一个 shared queue,主要分为两部分:

一个用于将类的实例推request送到queue另一个访问request中的多个对象queue并处理这些对象。request是一个非常简单的类(仅用于测试),带有一个string req字段。

我正在研究第二部分,在此过程中,我想保留一个scheduling thread和多个(在我的示例中为两个)executing threads

我想要一个单独的原因scheduling thread是为了减少访问的数量lockunlock操作queueby multiple executing threads

我正在使用pthread库,我的调度和执行函数如下所示:

void * sched(void* elem) {

    queue<request> *qr = static_cast<queue<request>*>(elem);

    pthread_t pt1, pt2;

    if(pthread_mutex_lock(&mut) == 0) {
        if(!qr->empty()) {
            int result1 = pthread_create(&pt1, NULL, execQueue, &(qr->front()));
            if (result1 != 0) cout << "error sched1" << endl;
qr->pop();
        }
        if(!qr->empty()) {
            int result2 = pthread_create(&pt2, NULL, execQueue, &(qr->front()));
            if (result2 != 0) cout << "error sched2" << endl;
qr->pop();
        }

        pthread_join(pt1, NULL);
        pthread_join(pt2, NULL);

        pthread_mutex_unlock(&mut);
    }

    return 0;
}

void * execQueue(void* elem) {

    request *r = static_cast<request*>(elem);
    cout << "req is: " << r->req << endl; // req is a string field

    return 0;
}

简单地说,每个execQueue线程都有一个要执行的线程,并且只是输出一个通过void* elem参数传递给它的请求。

sched使用线程调用 in main()(如果您想知道如何调用,main()如下所示)

pthread_t schedpt;
int schresult = pthread_create(&schedpt, NULL, sched, &q);
if (schresult != 0) cout << "error sch" << endl;

pthread_join(schedpt, NULL);

并且该sched函数本身从 中创建多个(此处为两个)executing threadspops request,并通过调用多个线程(pthread_create 然后 ptrhead_join)来queue执行s。requestexecQueue

问题是程序的奇怪行为。

当我检查队列中的大小和元素而不创建线程并在多个线程上调用它们时,它们正是我所期望的。

但是,当我用多个线程运行程序时,它会打印出来

1 个项目在队列中。2 个项目在队列中。req 是:req 是:FIRST!��(x'�j|1��rj|p�rj|1����第一个!”'�j|!�'�j|�'�j| P��(�(��(1� ��i|p��i|

最后一行不断变化。

所需的输出是

1 个项目在队列中。2 个项目在队列中。req 是:FIRST req 是:FIRST

我想要么是我execQueue在多个线程上调用的方式,要么是我pop()错的方式,但我无法找出问题所在,也找不到任何来源来参考正确用法。

请帮助我。忍受我笨拙地使用 pthread,因为我是初学者。

4

2 回答 2

3

这是您经过温和消毒的 c++11 版本,因为我需要一个简单的测试东西来安装 MSVC2013 :)

在Coliru现场观看

#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <queue>
#include <string>

struct request { std::string req; };

std::queue<request> q;
std::mutex queue_mutex;

void execQueue(request r) {
    std::cout << "req is: " << r.req << std::endl; // req is a string field
}

bool sched(std::queue<request>& qr) {
    std::thread pt1, pt2;

    {
        std::lock_guard<std::mutex> lk(queue_mutex);
        if (!qr.empty()) {
            pt1 = std::thread(&execQueue, std::move(qr.front()));
            qr.pop();
        }
        if (!qr.empty()) {
            pt2 = std::thread(&execQueue, std::move(qr.front()));
            qr.pop();
        }
    }

    if (pt1.joinable()) pt1.join();
    if (pt2.joinable()) pt2.join();

    return true;
}

int main()
{
    auto fut = std::async(sched, std::ref(q));
    if (!fut.get()) 
        std::cout << "error" << std::endl;
}

当然它现在实际上并没有做太多(因为队列中没有任务)。

于 2013-11-04T09:28:15.637 回答
3

您的队列包含objects,而不是指向 objects 的指针。您可以按原样寻址队列前面的对象operator &(),但是一旦弹出队列,该对象就消失了,并且该地址不再有效。当然,sched不在乎,但是execQueue发送该地址的功能确实可以

对您的代码最直接的修复是:

改变这个:

pthread_create(&pt1, NULL, execQueue, &(qr->front()));

对此:

// send a dynamic *copy* of the front queue node to the thread
pthread_create(&pt1, NULL, execQueue, new request(qr->front()));

并且您的线程 proc 应更改为:

void * execQueue(void* elem) 
{
    request *r = static_cast<request*>(elem);
    cout << "req is: " << r->req << endl; // req is a string field
    delete r;
    return nullptr;
}

也就是说,我可以想出更好的方法来做到这一点,但这应该可以解决您的直接问题,假设您的request对象类是可复制构造的,并且如果它具有动态成员,则遵循“三规则”

于 2013-11-02T20:35:48.267 回答