8

我想创建一个可以从多个线程调用其方法的类。但不是在调用它的线程中执行该方法,而是应该在它自己的线程中执行它们。不需要返回结果,也不应该阻塞调用线程。

我在下面包含的第一次尝试实现。公共方法将函数指针和数据插入到作业队列中,然后工作线程会拾取该作业队列。然而,它并不是特别好的代码,并且添加新方法很麻烦。

理想情况下,我想将它用作一个基类,我可以轻松地添加方法(具有可变数量的参数),同时减少麻烦和代码重复。

有什么更好的方法来做到这一点?是否有任何现有的代码可以做类似的事情?谢谢

#include <queue>

using namespace std;

class GThreadObject
{
    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        void * data;
    };

public:
    void functionOne(char * argOne, int argTwo);

private:
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionOneProxy(void * buffer);
    void functionOneInternal(char * argOne, int argTwo);

};



#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * It should block on a condition, but Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    //Execute the function pointer with the attached data
    (*this.*receivedEvent->funcPtr)(receivedEvent->data);
}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{

    //Malloc an object the size of the function arguments
    int argumentSize = sizeof(char*)+sizeof(int);
    void * myData = malloc(argumentSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, &argOne, argumentSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = myData;
    myEvent->funcPtr = &GThreadObject::functionOneProxy;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

/*
 * This is the function I would like to remove if possible
 * Split the void * buffer into arguments for the internal Function
 */
void GThreadObject::functionOneProxy(void * buffer)
{
    char * cBuff = (char*)buffer;
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*)));
};

int main()
{
    GThreadObject myObj;

    myObj.functionOne("My Message", 23);

    return 0;
}
4

8 回答 8

6

Futures库正在进入Boost和 C++ 标准库。ACE 中也有类似的东西,但我不想向任何人推荐它(正如@lothar 已经指出的那样,它是 Active Object。)

于 2009-05-29T01:05:34.647 回答
3

下面是一个不需要“functionProxy”方法的实现。尽管添加新方法更容易,但它仍然很混乱。

Boost::Bind 和“Futures”似乎确实会整理很多东西。我想我会看看 boost 代码,看看它是如何工作的。谢谢大家的建议。

GThreadObject.h

#include <queue>

using namespace std;

class GThreadObject
{

    template <int size>
    class VariableSizeContainter
    {
        char data[size];
    };

    class event
    {
        public:
        void (GThreadObject::*funcPtr)(void *);
        int dataSize;
        char * data;
    };

public:
    void functionOne(char * argOne, int argTwo);
    void functionTwo(int argTwo, int arg2);


private:
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
    void workerThread();
    queue<GThreadObject::event*> jobQueue;
    void functionTwoInternal(int argTwo, int arg2);
    void functionOneInternal(char * argOne, int argTwo);

};

GThreadObject.cpp

#include <iostream>
#include "GThreadObject.h"

using namespace std;

/* On a continuous loop, reading tasks from queue
 * When a new event is received it executes the attached function pointer
 * Thread code removed to decrease clutter
 */
void GThreadObject::workerThread()
{
    //New Event added, process it
    GThreadObject::event * receivedEvent = jobQueue.front();

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
     * This is the bit i would like to remove
     * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
     * Subsequent data sizes would need to be added with an else if
     * */
    if (receivedEvent->dataSize == 8)
    {
        const int size = 8;

        void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
        newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;

        //Execute the function
        (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
    }

    //Clean up
    free(receivedEvent->data);
    delete receivedEvent;

}

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{

    //Malloc an object the size of the function arguments
    void * myData = malloc(argSize);
    //Copy the data passed to this function into the buffer
    memcpy(myData, (char*)argStart, argSize);

    //Create the event and push it on to the queue
    GThreadObject::event * myEvent = new event;
    myEvent->data = (char*)myData;
    myEvent->dataSize = argSize;
    myEvent->funcPtr = funcPtr;
    jobQueue.push(myEvent);

    //This would be send a thread condition signal, replaced with a simple call here
    this->workerThread();

}

/*
 * This is the public interface, Can be called from child threads
 * Instead of executing the event directly it adds it to a job queue
 * Then the workerThread picks it up and executes all tasks on the same thread
 */
void GThreadObject::functionOne(char * argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;

    //Now do the work
}

void GThreadObject::functionTwo(int argOne, int argTwo)
{
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}

/*
 * This handles the actual event
 */
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}

主文件

#include <iostream>
#include "GThreadObject.h"

int main()
{

    GThreadObject myObj;

    myObj.functionOne("My Message", 23);
    myObj.functionTwo(456, 23);


    return 0;
}

编辑:为了完整起见,我用 Boost::bind 做了一个实现。主要区别:

queue<boost::function<void ()> > jobQueue;

void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo));

    workerThread();
}

void GThreadObjectBoost::workerThread()
{
    boost::function<void ()> func = jobQueue.front();
    func();
}

对 functionOne() 的 10,000,000 次迭代使用 boost 实现大约需要 19 秒。然而,非升压实施仅花费了约 6.5 秒。所以慢了大约 3 倍。我猜想找到一个好的非锁定队列将是这里最大的性能瓶颈。但这仍然是一个很大的区别。

于 2009-05-29T14:10:18.653 回答
2

POCO库在线程部分有一些类似的东西,称为 ActiveMethod(以及一些相关的功能,例如 ActiveResult)。源代码很容易获得并且易于理解。

于 2009-05-29T01:20:12.060 回答
1

您可能对ACE框架的ACE 模式之一的Active Object感兴趣。

正如Nikolai指出的那样,未来计划在未来某个时间使用标准 C++(双关语)。

于 2009-05-29T00:57:29.360 回答
1

对于可扩展性和可维护性(以及其他能力),您可以为线程要执行的“作业”定义一个抽象类(或接口)。然后线程池的用户将实现此接口并将对象引用到线程池。这与 Symbian Active Object 设计非常相似:每个 AO 都是 CActive 的子类,并且必须实现 Run() 和 Cancel() 等方法。

为简单起见,您的界面(抽象类)可能很简单:

class IJob
{
    virtual Run()=0;
};

然后线程池或单线程接受请求将具有以下内容:

class CThread
{
   <...>
public:
   void AddJob(IJob* iTask);
   <...>
};

自然地,您将拥有多个任务,这些任务可以具有各种额外的设置器/获取器/属性以及您在任何行业中需要的任何东西。但是,唯一必须的是实现方法 Run(),它会执行冗长的计算:

class CDumbLoop : public IJob
{
public:
    CDumbJob(int iCount) : m_Count(iCount) {};
    ~CDumbJob() {};
    void Run()
    {
        // Do anything you want here
    }
private:
    int m_Count;
};
于 2009-05-29T01:26:47.413 回答
1

您可以使用 Boost 的 Thread -library 来解决这个问题。像这样的东西(半伪):


class GThreadObject
{
        ...

        public:
                GThreadObject()
                : _done(false)
                , _newJob(false)
                , _thread(boost::bind(>hreadObject::workerThread, this))
                {
                }

                ~GThreadObject()
                {
                        _done = true;

                        _thread.join();
                }

                void functionOne(char *argOne, int argTwo)
                {
                        ...

                        _jobQueue.push(myEvent);

                        {
                                boost::lock_guard l(_mutex);

                                _newJob = true;
                        }

                        _cond.notify_one();
                }

        private:
                void workerThread()
                {
                        while (!_done) {
                                boost::unique_lock l(_mutex);

                                while (!_newJob) {
                                        cond.wait(l);
                                }

                                Event *receivedEvent = _jobQueue.front();

                                ...
                        }
                }

        private:
                volatile bool             _done;
                volatile bool             _newJob;
                boost::thread             _thread;
                boost::mutex              _mutex;
                boost::condition_variable _cond;
                std::queue<Event*>        _jobQueue;
};

另外,请注意RAII如何使我们能够更小、更好地管理此代码。

于 2009-05-29T08:42:34.337 回答
0

这是我为类似目的编写的一个类(我将它用于事件处理,但您当然可以将它重命名为 ActionQueue ——并重命名它的方法)。

你像这样使用它:

使用您要调用的功能:void foo (const int x, const int y) { /*...*/ }

和:EventQueue q;

q.AddEvent (boost::bind (foo, 10, 20));

在工作线程中

q.PlayOutEvents();

注意:在条件下添加代码以阻止使用 CPU 周期应该相当容易。

代码(带有 boost 1.34.1 的 Visual Studio 2003):

#pragma once

#include <boost/thread/recursive_mutex.hpp>
#include <boost/function.hpp>
#include <boost/signals.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <string>
using std::string;


// Records & plays out actions (closures) in a safe-thread manner.

class EventQueue
{
    typedef boost::function <void ()> Event;

public:

    const bool PlayOutEvents ()
    {
        // The copy is there to ensure there are no deadlocks.
        const std::vector<Event> eventsCopy = PopEvents ();

        BOOST_FOREACH (const Event& e, eventsCopy)
        {
            e ();
            Sleep (0);
        }

        return eventsCopy.size () > 0;
    }

    void AddEvent (const Event& event)
    {
        Mutex::scoped_lock lock (myMutex);

        myEvents.push_back (event);
    }

protected:

    const std::vector<Event> PopEvents ()
    {
        Mutex::scoped_lock lock (myMutex);

        const std::vector<Event> eventsCopy = myEvents;
        myEvents.clear ();

        return eventsCopy;
    }

private:

    typedef boost::recursive_mutex Mutex;
    Mutex myMutex;

    std::vector <Event> myEvents;

};

我希望这有帮助。:)

马丁比尔斯基

于 2009-05-29T11:33:12.777 回答
0

你应该看看 Boost ASIO 库。它旨在异步调度事件。它可以与 Boost Thread 库配对以构建您描述的系统。

您需要实例化一个boost::asio::io_service对象并安排一系列异步事件(boost::asio::io_service::postboost::asio::io_service::dispatch)。接下来,从n 个线程调用run成员函数。该对象是线程安全的,并保证您的异步处理程序只会在您调用.io_serviceio_service::run

boost::asio::strand对象对于简单的线程同步也很有用。

对于它的价值,我认为 ASIO 库是解决这个问题的一个非常优雅的解决方案。

于 2009-05-29T12:22:34.097 回答