4

首先我问这个在主线程上从 boost 线程运行一个函数并将参数传递给该函数

所以现在我正在尝试这个:

以下是我完美模拟我的大项目的控制台 c++ 项目

TestServicePost.cpp

#include "stdafx.h"
#include "SomeClass.h"


int _tmain(int argc, _TCHAR* argv[])
{
    SomeClass* s = new SomeClass();
    while(true)
    {
        s->update();
    }
    return 0;
}

一些类.h

#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <queue>

class ServiceNote
{
public:
    std::string getType()
    {
        std::stringstream typeSS;
        typeSS << "LamasaTech.MultiWall.PostNote." << (NoteType.compare("Normal") == 0 ? "Node" : "Header") << "." << Shape << "." << Colour;
        return typeSS.str();
    }
    int Action; 
    int CNoteId;    
    std::string Colour; 
    int NoteId; 
    std::string NoteType;   
    int SessionId;  
    std::string Shape;  
    std::string Style;  
    std::string Text;   
    int X;  
    int Y;  
};

class SomeClass
{
public:
    SomeClass();
    ~SomeClass();
    void update();

private:
    std::queue<ServiceNote> pendingNotes;
    void addToQueue(ServiceNote sn);
    void pollService(boost::asio::io_service* svc);
    int getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId);
    boost::thread servicePoller;
};

SomeClass.cpp

#include "stdafx.h"
#include "SomeClass.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/asio/signal_set.hpp>

#define POLL_SERVICE = 0;
#define POLLING_WAIT_TIME 1000
#define SAVE_SESSION_EVERY 1800000

SomeClass::SomeClass()
{
    boost::asio::io_service io_servicePoller;
    io_servicePoller.run();
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, &io_servicePoller));
    /*boost::asio::io_service io_sessionSaver;
    boost::asio::signal_set signalsSaver(io_sessionSaver, SIGINT, SIGTERM);
    signalsSaver.async_wait( boost::bind(&boost::asio::io_service::stop, &io_sessionSaver));
    sessionSaver = boost::thread(&SomeClass::saveSessionEvery, io_sessionSaver);*/
}

SomeClass::~SomeClass()
{
}

void SomeClass::update()
{   
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

void SomeClass::addToQueue(ServiceNote sn)
{
    pendingNotes.push(sn);
}

void SomeClass::pollService(boost::asio::io_service* svc)
{
    int messageId = 1;
    while(true)
    {
        if(boost::this_thread::interruption_enabled() && boost::this_thread::interruption_requested())
            return;
        int currentId = messageId;
        messageId = getMessage(svc, "49", messageId);
        if(currentId == messageId)
            boost::this_thread::sleep(boost::posix_time::milliseconds(POLLING_WAIT_TIME));
    }
}

int SomeClass::getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId)
{
    try
    {
        boost::asio::io_service io_service;

        // Get a list of endpoints corresponding to the server name.
        boost::asio::ip::tcp::resolver resolver(io_service);
        boost::asio::ip::tcp::resolver::query query("mw.rombus.com", "http");
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

        // Try each endpoint until we successfully establish a connection.
        boost::asio::ip::tcp::socket socket(io_service);
        boost::asio::connect(socket, endpoint_iterator);

        // Form the request. We specify the "Connection: close" header so that the
        // server will close the socket after transmitting the response. This will
        // allow us to treat all data up until the EOF as the content.
        boost::asio::streambuf request;
        std::ostream request_stream(&request);
        request_stream << "GET " "/Service.svc/message/" << sessionId << "/" << messageId << " HTTP/1.0\r\n";
        request_stream << "Host: " << "mw.rombus.com" << "\r\n";
        request_stream << "Accept: */*\r\n";
        request_stream << "Connection: close\r\n\r\n";

        // Send the request.
        boost::asio::write(socket, request);

        // Read the response status line. The response streambuf will automatically
        // grow to accommodate the entire line. The growth may be limited by passing
        // a maximum size to the streambuf constructor.
        boost::asio::streambuf response;
        boost::asio::read_until(socket, response, "\r\n");

        // Check that response is OK.
        std::istream response_stream(&response);
        std::string http_version;
        response_stream >> http_version;
        unsigned int status_code;
        response_stream >> status_code;
        std::string status_message;
        std::getline(response_stream, status_message);
        if (!response_stream || http_version.substr(0, 5) != "HTTP/")
        {
            //std::cout << "Invalid response\n";
            return messageId;
        }
        if (status_code != 200)
        {
            //std::cout << "Response returned with status code " << status_code << "\n";
            return messageId;
        }

        // Read the response headers, which are terminated by a blank line.
        boost::asio::read_until(socket, response, "\r\n\r\n");

        // Process the response headers.
        std::string header;
        std::string fullHeader = "";
        while (std::getline(response_stream, header) && header != "\r")
            fullHeader.append(header).append("\n");

        // Write whatever content we already have to output.
        std::string fullResponse = "";
        if (response.size() > 0)
        {
            std::stringstream ss;
            ss << &response;
            fullResponse = ss.str();
            try
            {
                boost::property_tree::ptree pt;
                boost::property_tree::read_json(ss, pt);
                ServiceNote sn;
                sn.Action =  pt.get<int>("Action");
                sn.CNoteId =  pt.get<int>("CNoteId");
                sn.Colour =  pt.get<std::string>("Colour");
                sn.NoteId =  pt.get<int>("NoteId");
                sn.NoteType =  pt.get<std::string>("NoteType");
                sn.SessionId =  pt.get<int>("SessionId");
                sn.Shape =  pt.get<std::string>("Shape");
                sn.Style =  pt.get<std::string>("Style");
                sn.Text =  pt.get<std::string>("Text");
                sn.X =  pt.get<int>("X");
                sn.Y =  pt.get<int>("Y");
                svc->post(boost::bind(&SomeClass::addToQueue, this, sn));
                //pendingNotes.push(sn);
            }
            catch (std::exception const& e)
            {
                std::string test = e.what();
                //std::cerr << e.what() << std::endl;
            }
            messageId++;
        }

        // Read until EOF, writing data to output as we go.
        std::string fullSth = "";
        boost::system::error_code error;
        while (boost::asio::read(socket, response,
                boost::asio::transfer_at_least(1), error))
        {
            std::ostringstream ss;
            ss << &response;
            fullSth = ss.str();
        }
        if (error != boost::asio::error::eof)
            throw boost::system::system_error(error);
    }
    catch (std::exception& e)
    {
        std::string test = e.what();
        std::cout << "Exception: " << e.what() << "\n";
    }
    return messageId;
}

但我得到Unhandled exception at 0x771215de in TestServicePost.exe: 0xC0000005: Access violation writing location 0xcccccce4.,就在这一行执行之后:

svc->post(boost::bind(&SomeClass::addToQueue, this, sn));

我无法将 io_service 定义为类成员,因此我可以在析构函数中使用它~SomeClass(),也希望能提供帮助

如果 io_service.post 对我来说不是最好的解决方案,请推荐一些东西,正如你所看到的,我有一个构造函数、析构函数和一个更新方法,每个滴答声都会被调用,我尝试单独使用这个和队列,但它不是线程安全的,是否有一个简单的线程安全 FIFO 可以使用?

4

2 回答 2

4

SomeClass构造函数中,您实际上执行以下操作:

  1. 定义一个本地 io_service实例。
  2. 调用它的run()成员函数,它立即返回,因为 io_service 没有工作。
  3. 将本地对象的地址传递给另一个线程。

这肯定行不通。

请注意,这io_service::run()是一种“消息循环”,因此它应该阻塞调用线程。不要在对象构造函数中调用它。

于 2012-12-09T09:02:55.570 回答
2

我想出了如何将 io_service 声明为类成员:

boost::shared_ptr< boost::asio::io_service > io_servicePoller;

在构造函数中我做了以下事情:

SomeClass::SomeClass()
{
    boost::shared_ptr< boost::asio::io_service > io_service(
        new boost::asio::io_service
    );
    io_servicePoller = io_service;
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller));
}

一些清理

SomeClass::~SomeClass()
{
    servicePoller.interrupt();
    io_servicePoller->stop();
    servicePoller.join();
}

在更新中我调用了 run 将这些东西添加到队列中,然后在 while 循环中读取它们

void SomeClass::update()
{   
    io_servicePoller->run();
    io_servicePoller->reset();
    while(!pendingNotes.empty())
    {
        ServiceNote sn = pendingNotes.front();

        pendingNotes.pop();
    }
}

并将我的成员签名更改为void SomeClass::pollService(boost::shared_ptr< boost::asio::io_service > svc)

所以会发生什么:

  1. 应用程序启动
  2. 开始我的课
  3. 我的班级提供服务并启动线程
  4. 线程从服务中获取项目
  5. 主线程检查io服务队列并退出
  6. 然后它使用队列

感谢 Igor R。没有他我无法做到

还有http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4我知道如何制作共享指针

于 2012-12-09T09:39:39.560 回答