0

我必须编写一个与并发线程一起工作的程序,用于处理 OpenCV Mat 图像。每个线程从队列中选择一个图像,对其进行处理并将结果放入另一个队列。我使用 Mat 图像的线程安全模板队列(如您在代码中所见)。

但是线程的奇怪行为是:如果我多次启动程序,每次我在单线程的详细处理次数中获得不同的结果(我插入的计数器“添加”用于监控单线程处理的图像数量) .

第一个线程(零)总是在做所有的细节(在这个例子中,10),但其余的线程,不要。有时每个线程都进行 10 次细化,有时 3 次,有时 5...2.. 新更改(条件变量和关键部分)线程仅执行 1 次操作。

我不知道问题出在哪里……为什么会这样。

我在这里传递了我的代码,并要求您检查它并在您的意见中告诉我,有什么问题......我很绝望。

这是代码:

#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <opencv2\highgui\highgui.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#include <queue>

using namespace std;
using namespace cv;

/*thread safe queue*/

template<typename T>
class coda_concorr
{
private:
    std::queue<T> la_coda;
    HANDLE mutex;

public: 
    bool elemento;
    coda_concorr()
    {
        mutex = CreateMutex(NULL,FALSE,NULL);
        }
    ~coda_concorr()
    {}
    void push(T& data)
    {
        WaitForSingleObject(mutex,INFINITE);
        la_coda.push(data);
        ReleaseMutex(mutex);
        }
    bool vuota() const
    {
        WaitForSingleObject(mutex,INFINITE);
        bool RetCode = la_coda.empty();
        ReleaseMutex(mutex);
        return RetCode;
    }
    bool try_pop(T& popped)
    {
        WaitForSingleObject(mutex,INFINITE);
        while (la_coda.empty()){
            ReleaseMutex(mutex);
            return false;
        }
        WaitForSingleObject(mutex,INFINITE);
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    }
};


struct Args
{
    coda_concorr<cv::Mat> in;
    coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};


CONDITION_VARIABLE NonVuoto1;
CONDITION_VARIABLE NonVuoto2;
CONDITION_VARIABLE NonVuoto3;
CONDITION_VARIABLE NonVuoto4;
CRITICAL_SECTION  Lock1;
CRITICAL_SECTION  Lock2;
CRITICAL_SECTION  Lock3;
CRITICAL_SECTION  Lock4;

bool stop;

//initial populating queue
void puts (void* param){
    Args* arg = (Args*)param;
    int i=0;
    Mat image;

    while(!arg->in.vuota()){
        arg->in.try_pop(image);
        arg->out->push(image);
        i++;        
        WakeConditionVariable(&NonVuoto1);
        }
    //fine  
    cout<<endl<<"Thread (PUSH) terminato con "<<i<<" elaborazioni."<<endl;
    WakeConditionVariable(&NonVuoto1);
    _endthread();
}

//grey funct
void grey (void *param){
    Mat temp1,temp2;
    int add = 0;
    Args* arg = (Args*)param;
    while(true){
        EnterCriticalSection(&Lock1);
        //se vuoto
        while(arg->in.vuota() && !stop){
             SleepConditionVariableCS(&NonVuoto1,&Lock1,INFINITE);
            }
            if(stop==true){
            LeaveCriticalSection(&Lock1);
            break;
            }
        arg->in.try_pop(temp1);
        cvtColor(temp1,temp2,CV_BGR2GRAY);
        arg->out->push(temp2);
        add++;
        cout<<endl<<"grey ha fatto: "<<add<<endl;
        LeaveCriticalSection(&Lock1);
        WakeConditionVariable(&NonVuoto2);
        }
    //fine  
    cout<<endl<<"Thread (GREY) terminato con "<<add<<" elaborazioni."<<endl;
    _endthread();
}

//threshold funct
void soglia(void *param){
    Mat temp1a,temp2a;
    int add=0;
    Args* arg = (Args*)param;
    while(true){
        EnterCriticalSection(&Lock2);
        while(arg->in.vuota() && stop == false){
             SleepConditionVariableCS(&NonVuoto2,&Lock2,INFINITE);
            }
        if(stop==true){
            LeaveCriticalSection(&Lock2);
            break;
            }
        arg->in.try_pop(temp1a);
        threshold(temp1a,temp2a,128,255,THRESH_BINARY);
        arg->out->push(temp2a);
        add++;
        LeaveCriticalSection(&Lock2);
        WakeConditionVariable(&NonVuoto3);
        cout<<endl<<"soglia ha fatto: "<<add<<endl;
        }
        //fine 
     cout<<endl<<"Thread (SOGLIA) terminato con "<<add<<" elaborazioni."<<endl;
     _endthread();
}

//erode/dilate funct
void finitura(void *param){
    Mat temp1b,temp2b,temp2c;
    int add = 0;
    Args* arg = (Args*)param;
    //come consumatore
    while(true){
        EnterCriticalSection(&Lock3);
        while(arg->in.vuota() && stop == false){
             SleepConditionVariableCS(&NonVuoto3,&Lock3,INFINITE);
            }
        if(stop==TRUE){
            LeaveCriticalSection(&Lock3);
            break;
            }   
        arg->in.try_pop(temp1b);
        erode(temp1b,temp2b,cv::Mat());
        dilate(temp2b,temp2c,Mat());
        arg->out->push(temp2c);
        add++;
        LeaveCriticalSection(&Lock3);
        WakeConditionVariable(&NonVuoto4);
        cout<<endl<<"erode ha fatto: "<<add<<endl;
        }
     //fine 
     cout<<endl<<"Thread (ERODE) terminato con "<<add<<" elaborazioni."<<endl;
    _endthread();
}

//contour funct
void contorno (void *param){
    Mat temp;
    int add=0;
    Args* arg = (Args*)param;
    //come consumatore
    while(true){
        EnterCriticalSection(&Lock4);
        while(arg->in.vuota() && stop == false){
             SleepConditionVariableCS(&NonVuoto4,&Lock4,INFINITE);
            }
        if(stop==TRUE){
            LeaveCriticalSection(&Lock4);
            break;
            }   
    //esegue pop
    arg->in.try_pop(temp);
    //trova i contorni
    vector<vector<Point>> contorni;
    findContours(temp,contorni,CV_RETR_LIST, CV_CHAIN_APPROX_SIMPLE);
    //disegna i contoni in un'immagine
    Mat dst(temp.size(), CV_8UC3, Scalar(0,0,0));
    Scalar colors[3];
    colors[0] = Scalar(255,0,0);
    colors[1] = Scalar(0,255,0);
    colors[2] = Scalar(0,0,255);
    for (size_t idx = 0; idx < contorni.size(); idx++){
        drawContours(dst,contorni,idx,colors[idx %3]);
        }

    //come produttore
    arg->out->push(dst);
    add++;
    cout<<endl<<"cont ha fatto: "<<add<<endl;
    LeaveCriticalSection(&Lock4);
    }
    cout<<endl<<"Thread (CONTOUR) terminato con "<<add<<" elaborazioni."<<endl;
   _endthread();
}

//main
int main()
{

    coda_concorr<cv::Mat> ingresso;
    coda_concorr<cv::Mat> uscita;

    InitializeConditionVariable(&NonVuoto1);
    InitializeConditionVariable(&NonVuoto2);
    InitializeConditionVariable(&NonVuoto3);
    InitializeConditionVariable(&NonVuoto4);
    InitializeCriticalSection(&Lock1);
    InitializeCriticalSection(&Lock2);
    InitializeCriticalSection(&Lock3);
    InitializeCriticalSection(&Lock4);


    LARGE_INTEGER count1, count2, freq;
    double elapsed;


    Mat temp[10];
    Mat out;

    //dichiarazione code
    Args dati0,dati1,dati2,dati3,dati4;


    //avvio contatori
    QueryPerformanceFrequency(&freq);   
    QueryPerformanceCounter (&count1);

    for(int i=0;i<10;i++){
        temp[i] = imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
        ingresso.push(temp[i]);
    }

    //next queue pointer
    dati0.in=ingresso;
    dati0.out=&dati1.in;
    dati1.out=&dati2.in;
    dati2.out=&dati3.in;
    dati3.out=&dati4.in;
    dati4.out=&uscita;



    //handle
    HANDLE handle0,handle1,handle2,handle3,handle4;

    //start threads
    handle0 = (HANDLE) _beginthread(puts,0,&dati0);
    handle1 = (HANDLE) _beginthread(grey,0,&dati1);
    handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
    handle3 = (HANDLE) _beginthread(finitura,0,&dati3);
    handle4 = (HANDLE) _beginthread(contorno,0,&dati4);

    cout<<endl<<"..Join dei threads..."<<endl;

    //join
    WaitForSingleObject(handle0,INFINITE);
    WaitForSingleObject(handle1,INFINITE);
    WaitForSingleObject(handle2,INFINITE);
    WaitForSingleObject(handle3,INFINITE);
    WaitForSingleObject(handle4,INFINITE);



    //chiusura contatori
    QueryPerformanceCounter (&count2);

    CloseHandle(handle0);
    CloseHandle(handle1);
    CloseHandle(handle2);
    CloseHandle(handle3);
    CloseHandle(handle4);

    elapsed = (count2.QuadPart - count1.QuadPart) * 1000.0 / freq.QuadPart;


    cout <<endl<<"Tempo di esecuzione approssimativo: " <<elapsed<<" ms."<<endl;
        system("PAUSE");
    return 0;
}

如果前一个线程将所有图像都放在队列中,为什么下一个线程不这样做?

我在使用 Visual C++ 2010 OpenCV 2.4.4 的 Windows 7 64 位

请帮我看看问题出在哪里...

4

1 回答 1

2

看起来你想要实现的类似于工厂里的流水线工作,每个人都做他的工作,然后把它扔给下一个工人,直到所有的工作都完成。如果我错了,请纠正我。每个线程函数的习语是

void dowork(){
    while(noinput()){
        sleep(0.01);
    }
    while(getSomeInput()){
        processInput();
        queueResult();
    }
    displayAmountOfWorkDone();
}

您成功地使用互斥锁提供互斥。您的设计的问题是,一旦一个线程观察到他的输入队列非空,他就会消耗所有工作然后退出。由于线程调度和处理时间的原因processInput(),一个工人可以以比在他生产之前的工人更高的速率消耗他的输入。例如,在 init 和 datit 之间排队,这是可能的:

datit: see 0, sleep
init : see 10 - add 1 
datit: see 1, process
datit: see 0, exit and outputs 1
init : add 2
init : add 3
init : add 4
....
init : add 10

你需要改变设计。应该有一个不同的机制来表示工作已经结束。现在,您正在使用线程可以观察到的输入量。一个快速而肮脏的解决方法是给每个线程他期望处理的输入量,并将算法重写为

void dowork(){
    while(workIsnotDone()){//local test, no lock
        if(getSomeInput()){
             processInput();
             queueResult();
             updateWorkProgress();//local operation
        }
        else{
             sleep(0.01);
        }
    }
    displayAmountOfWorkDone();
}

更好的选择是将类设置coda_concorr为生产者-消费者机制。为此,您可以添加一个条件变量。每个线程是一个队列上的消费者和另一个队列上的生产者。您还可以添加一个字段来明确指定不再输入的内容。看看关于 SO 的另一个问题

于 2013-06-18T08:51:38.473 回答