0

我在我的代码中使用parallel_pipeline函数。有时当我的条件得到满足时,它会停止管道,有时它不会。当流控制调用停止时,即使在此之后它也不会终止,而是调用它的下一部分并在控制台上打印然后控制台输出变得就像它一直处于无限循环并且什么都不做。

代码是:

#include <iostream> 
#include <sstream>
#include <fstream>
#include <vector>
#include <algorithm>
#include <tbb/pipeline.h>
#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
#include <tbb/compat/thread>
#include <tbb/tbbmalloc_proxy.h>    
#include <tbb/tick_count.h>
using namespace std;
using namespace tbb;

#define pi 3.141593
#define FILTER_LEN  265

double coeffs[ FILTER_LEN ] =
{
  0.0033473431384214393,0.000032074683390218124,0.0033131082058404943,0.0024777666109278788,
  -0.0008968429179843104,-0.0031973449396977684,-0.003430943381749411,-0.0029796565504781646,
  -0.002770673157048994,-0.0022783059845596586,-0.0008531818129514857,0.001115432556294998,
  0.0026079871108133294,0.003012423848769931,0.002461420635709332,0.0014154004589753215,
  0.00025190669718400967,-0.0007608257014963959,-0.0013703600874774068,-0.0014133823230551277,
  -0.0009759556503342884,-0.00039687498737139273,-0.00007527524701314324,-0.00024181463305012626,
  -0.0008521761947454302,-0.00162618205097997,-0.002170446498273018,-0.002129903305507943,
  -0.001333859049002249,0.00010700092934983156,0.0018039564602637683,0.0032107930896349583,
  0.0038325849735515363,0.003416201274366522,0.002060848732332109,0.00017954815260431595,
  -0.0016358832300944531,-0.0028402136847527387,-0.0031256650498727384,-0.0025374271571154713,
  -0.001438370315670195,-0.00035115295209013755,0.0002606730012030533,0.0001969569787142967,
  -0.00039635535951198597,-0.0010886127490608972,-0.0013530057243606405,-0.0008123200399262436,
  0.0005730271959526784,0.0024419465938120906,0.004133717273258681,0.0049402122577746265,
  0.0043879285604252714,0.002449549610687005,-0.00040283102645093463,-0.003337730734820209,
  -0.0054508346511294775,-0.006093057767824609,-0.005117609782189977,-0.0029293645861970417,
  -0.0003251033117661085,0.0018074390555649442,0.0028351284091668164,0.002623563404428517,
  0.0015692864792199496,0.0004127664681096788,-0.00009249878881824428,0.0004690173244168184,
  0.001964334172374759,0.0037256715492873485,0.004809640399145206,0.004395274594482053,
  0.0021650921193604,-0.0014888595443799124,-0.005534807968511709,-0.008642334104607624,
  -0.009668950651149259,-0.008104732391434574,-0.004299972815463919,0.0006184612821881392,
  0.005136551428636121,0.007907786753766152,0.008241212326068366,0.00634786595941524,
  0.003235610213062744,0.00028882736660937287,-0.001320994685952108,-0.0011237433853145615,
  0.00044213409507615003,0.0022057106517524255,0.00277593527678719,0.0011909915058737617,
  -0.0025807757230413447,-0.007497632882437637,-0.011739520895818884,-0.013377018279057393,
  -0.011166543231844196,-0.005133056165990026,0.0032948631959114935,0.011673660427968408,
  0.017376415708412904,0.018548938130314566,0.014811760899506572,0.007450782505155853,
  -0.001019540069785369,-0.007805775815783898,-0.010898333714715424,-0.00985364043415772,
  -0.005988406030111452,-0.001818560524968024,0.000028552677472614846,-0.0019938756495376363,
  -0.007477684025727061,-0.013989430449615033,-0.017870518868849213,-0.015639422062597726,
  -0.005624959109456065,0.010993528170353541,0.03001263681283932,0.04527492462846608,
  0.050581340787164114,0.041949186532860346,0.019360612460662185,-0.012644336735920483,
  -0.0458782599058412,-0.07073838953156347,-0.0791205623455818,-0.06709535677423759,
  -0.03644544574795176,0.005505370370858695,0.04780486657828151,0.07898800597378192,
  0.0904453420042807,0.07898800597378192,0.04780486657828151,0.005505370370858695,
  -0.03644544574795176,-0.06709535677423759,-0.0791205623455818,-0.07073838953156347,
  -0.0458782599058412,-0.012644336735920483,0.019360612460662185,0.041949186532860346,
  0.050581340787164114,0.04527492462846608,0.03001263681283932,0.010993528170353541,
  -0.005624959109456065,-0.015639422062597726,-0.017870518868849213,-0.013989430449615033,
  -0.007477684025727061,-0.0019938756495376363,0.000028552677472614846,-0.001818560524968024,
  -0.005988406030111452,-0.00985364043415772,-0.010898333714715424,-0.007805775815783898,
  -0.001019540069785369,0.007450782505155853,0.014811760899506572,0.018548938130314566,
  0.017376415708412904,0.011673660427968408,0.0032948631959114935,-0.005133056165990026,
  -0.011166543231844196,-0.013377018279057393,-0.011739520895818884,-0.007497632882437637,
  -0.0025807757230413447,0.0011909915058737617,0.00277593527678719,0.0022057106517524255,
  0.00044213409507615003,-0.0011237433853145615,-0.001320994685952108,0.00028882736660937287,
  0.003235610213062744,0.00634786595941524,0.008241212326068366,0.007907786753766152,
  0.005136551428636121,0.0006184612821881392,-0.004299972815463919,-0.008104732391434574,
  -0.009668950651149259,-0.008642334104607624,-0.005534807968511709,-0.0014888595443799124,
  0.0021650921193604,0.004395274594482053,0.004809640399145206,0.0037256715492873485,
  0.001964334172374759,0.0004690173244168184,-0.00009249878881824428,0.0004127664681096788,
  0.0015692864792199496,0.002623563404428517,0.0028351284091668164,0.0018074390555649442,
  -0.0003251033117661085,-0.0029293645861970417,-0.005117609782189977,-0.006093057767824609,
  -0.0054508346511294775,-0.003337730734820209,-0.00040283102645093463,0.002449549610687005,
  0.0043879285604252714,0.0049402122577746265,0.004133717273258681,0.0024419465938120906,
  0.0005730271959526784,-0.0008123200399262436,-0.0013530057243606405,-0.0010886127490608972,
  -0.00039635535951198597,0.0001969569787142967,0.0002606730012030533,-0.00035115295209013755,
  -0.001438370315670195,-0.0025374271571154713,-0.0031256650498727384,-0.0028402136847527387,
  -0.0016358832300944531,0.00017954815260431595,0.002060848732332109,0.003416201274366522,
  0.0038325849735515363,0.0032107930896349583,0.0018039564602637683,0.00010700092934983156,
  -0.001333859049002249,-0.002129903305507943,-0.002170446498273018,-0.00162618205097997,
  -0.0008521761947454302,-0.00024181463305012626,-0.00007527524701314324,-0.00039687498737139273,
  -0.0009759556503342884,-0.0014133823230551277,-0.0013703600874774068,-0.0007608257014963959,
  0.00025190669718400967,0.0014154004589753215,0.002461420635709332,0.003012423848769931,
  0.0026079871108133294,0.001115432556294998,-0.0008531818129514857,-0.0022783059845596586,
  -0.002770673157048994,-0.0029796565504781646,-0.003430943381749411,-0.0031973449396977684,
  -0.0008968429179843104,0.0024777666109278788,0.0033131082058404943,0.000032074683390218124,
  0.0033473431384214393
};

class MyBuffer 
{
    public:
    double *acc;
    double *buffer;
    int start,end;
    static int j;

    MyBuffer()
    {
        start=0;
        end=0;

       buffer=new double[150264];
       acc=new double[150000];
       fill_n(buffer,150264,0);

    }

    ~MyBuffer()
    {
        delete[] buffer;
        delete[] acc;
    }

    int startnumber()
    {
        return start;
    }

    int endnumber()
    {
        return end;
    }

};

typedef concurrent_bounded_queue<MyBuffer>  QueueMyBufferType;
QueueMyBufferType chunk_queue;

atomic<bool> stop_flag;
atomic<bool> stop_filter;

int MyBuffer::j=0;
int queueloopcount=30;

void input_function()                               
{   
   stop_flag = false;     

   cout<<"thread reached to call input function " <<endl;
   ofstream o("testing sinewave.csv");

   int counter=0;
   while(counter<150000)    
  { 
    //  cout<<"value of counter is \t" <<counter << endl;

        MyBuffer *b=new MyBuffer;                                                       
        b->start=(FILTER_LEN-1+(counter));
        b->end=(5264+(counter));

    //  cout<<"value of b.start is and b.end is "<<b->start<<"\t" <<b->end<<endl;

        for(int i =b->startnumber(); i <b->endnumber(); i++)
         {
               b->buffer[i] = sin(700 * (2 * pi) * (i / 5000.0));
               o<<b->buffer[i]<<endl;   
         }
         chunk_queue.push(*b);

         counter+=5000;
        // cout<<"value of queueloopcount is "<< queueloopcount << endl;
     }

     cout<<"all data is perfectly generated" <<endl;
}

int main()
{

    int ntokens = 8;

    thread inputfunc(input_function);
    tick_count t1,t2;
    ofstream o("filter700Hz.csv");
    t1=tick_count::now();

     bool stop_pipeline = false;    
     stop_filter=false;


     inputfunc.join();

     parallel_pipeline(ntokens,make_filter<void,MyBuffer*>
     ( 
             filter::parallel,[&](flow_control& fc)->MyBuffer*
         {
             if(queueloopcount==0)
             {
                 fc.stop();
                 cout<<"pipeline stopped"<<endl;
             }
             else
             {
                  MyBuffer *b=new MyBuffer;
                  chunk_queue.pop(*b);
                 {
                     cout<<"value of start and end popped is "<<b->startnumber()<<"\t"<<b->endnumber()<<endl;
                     queueloopcount--;
                 }
                 return b;
             }
         }
     )&

    make_filter<MyBuffer*, void>
    (
        filter::serial,[&](MyBuffer* b)
        {
             cout<<"value of second filter start is and end is \t "<< b->startnumber() << "\t" << b->endnumber() <<endl;
        }
     )
    );

        cout<<"now i am out" <<endl;
    o.close();
    t2=tick_count::now();

    cout << "\n Time elapsed is \n\n" <<(t2-t1).seconds()<<endl;        

    return 0;
}

请帮助找出代码错误的地方。

4

1 回答 1

0

这段代码中的问题是过滤器,即第一阶段的并行,这导致 flow_control 对象出现问题,该对象试图弹出,它是一个阻塞调用,因此它阻塞,解决方案是你可能应该有一个串行的第一个过滤器,只有创建一个空的 MyBuffer* 并在没有更多工作到期时停止管道。然后有一个并行的第二个滤波器来执行实际工作,最后是一个串行(按顺序)输出级。

于 2013-06-04T05:25:16.210 回答