1

我的服务器应用程序有一个奇怪的问题。我的系统很简单:我有 1 台以上的设备和一个通过网络进行通信的服务器应用程序。协议具有可变长度的二进制数据包,但标头固定(包含有关当前数据包大小的信息)。数据包示例:

char pct[maxSize] = {}
pct[0] = 0x5a //preambule
pct[1] = 0xa5 //preambule
pct[2] = 0x07 //packet size
pct[3] = 0x0A //command
... [payload]

该协议建立在命令应答的原则之上。

我使用 boost::asio 进行通信 - io_service 与线程拉(4 个线程)+ 异步读/写操作(下面的代码示例)并创建一个“查询周期” - 每 200 毫秒按计时器:

  • 从设备查询一个值
  • 获取结果,查询第二个值
  • 得到结果,再次启动计时器

这在 boost 1.53(调试和发布)上工作得很好。但随后我切换到 boost 1.54(尤其是在发布模式下)魔法开始了。我的服务器成功启动,连接到设备并开始“查询周期”。大约 30-60 秒一切正常(我收到数据,数据是正确的),但随后我开始在最后一次读取句柄上接收 asio::error (总是在一个地方)。错误类型:EOF。收到错误后,我必须断开与设备的连接。

谷歌搜索的一些时间给我关于 EOF 的信息表明另一端(在我的情况下是设备)启动了断开连接过程。但是,根据设备的逻辑,它不可能是真的。有人可以解释发生了什么吗?可能我需要设置一些套接字选项或定义吗?我看到两个可能的原因:

  • 我方初始化断开连接(出于某种原因,我不知道)并且 EOF 是此操作的答案。
  • 一些套接字超时触发。

我的环境:

  • 操作系统:Windows 7/8
  • 编译器:MSVC 2012 更新 3

主“查询周期”示例代码。改编自官方boost chat 示例 所有代码简化以减少空间:)

  • SocketWorker - 套接字的低级包装器
  • DeviceWorker - 设备通信类
  • ERes - 错误存储的内部结构
  • ProtoCmd 和 ProtoAnswer - 原始数组命令和答案的包装器(来自boost 聊天示例的 chat_message 模拟)
  • lw_service_proto 命名空间 - 预定义的命令和最大数据包大小

所以,代码示例。套接字包装器:

namespace b = boost;
namespace ba = boost::asio;

typedef b::function<void(const ProtoAnswer answ)> DataReceiverType;

class SocketWorker
{
private:
    typedef ba::ip::tcp::socket socketType;
    typedef std::unique_ptr<socketType> socketPtrType;
    socketPtrType devSocket;
    ProtoCmd      sendCmd;
    ProtoAnswer   rcvAnsw; 

    //[other definitions]

public:

//---------------------------------------------------------------------------
ERes SocketWorker::Connect(/*[connect settings]*/)
{
    ERes res(LGS_RESULT_ERROR, "Connect to device - Unknow Error");

    using namespace boost::asio::ip;
    boost::system::error_code sock_error;

    //try to connect
    devSocket->connect(tcp::endpoint(address::from_string(/*[connect settings ip]*/), /*[connect settings port]*/), sock_error);

    if(sock_error.value() > 0) {
        //[work with error]
        devSocket->close();
    }
    else {
        //[res code ok]
    } 

    return res;
}
//---------------------------------------------------------------------------
ERes SocketWorker::Disconnect()
{
    if (devSocket->is_open())
    {
        boost::system::error_code ec;
        devSocket->shutdown(bi::tcp::socket::shutdown_send, ec);
        devSocket->close();
    }
    return ERes(LGS_RESULT_OK, "OK");
}

//---------------------------------------------------------------------------
//query any cmd
void SocketWorker::QueryCommand(const ProtoCmd cmd, DataReceiverType dataClb)
{
    sendCmd = std::move(cmd); //store command
    if (sendCmd .CommandLength() > 0)
    {
        ba::async_write(*devSocket.get(), ba::buffer(sendCmd.Data(), sendCmd.Length()),
                        b::bind(&SocketWorker::HandleSocketWrite,
                                this, ba::placeholders::error, dataClb));
    }
    else
    {
        cerr << "Send command error: nothing to send" << endl;
    }
}

//---------------------------------------------------------------------------
// boost socket handlers
void SocketWorker::HandleSocketWrite(const b::system::error_code& error, 
                                                   DataReceiverType dataClb)
{
    if (error)
    {
        cerr << "Send cmd error: " << error.message() << endl;
        //[send error to other place]
        return;
    }

    //start reading header of answer (lw_service_proto::headerSize == 3 bytes)
    ba::async_read(*devSocket.get(),
                   ba::buffer(rcvAnsw.Data(), lw_service_proto::headerSize),
                   b::bind(&SocketWorker::HandleSockReadHeader, 
                           this, ba::placeholders::error, dataClb)); 
}
//---------------------------------------------------------------------------
//handler for read header
void SocketWorker::HandleSockReadHeader(const b::system::error_code& error, DataReceiverType dataClb)
{
    if (error)
    {
        //[error working]
        return;
    }

    //decode header (check preambule and get  full packet size) and read answer payload
    if (rcvAnsw.DecodeHeaderAndGetCmdSize())
    {
      ba::async_read(*devSocket.get(),
                     ba::buffer(rcvAnsw.Answer(), rcvAnsw.AnswerLength()),
                     b::bind(&SocketWorker::HandleSockReadBody, 
                             this, ba::placeholders::error, dataClb));
    }
}
//---------------------------------------------------------------------------
//handler for andwer payload
void SocketWorker::HandleSockReadBody(const b::system::error_code& error, DataReceiverType dataClb)
{
    //if no error - send anwser to 'master'
    if (!error){
        if (dataClb != nullptr) 
            dataClb(rcvAnsw);
    }
    else{
        //[error process]

        //here i got EOF in release mode
    }
}

};

设备工作者

class DeviceWorker
{
private:
    const static int LW_QUERY_TIME = 200;
    LWDeviceSocketWorker sockWorker;
    ba::io_service&    timerIOService;
    typedef std::shared_ptr<ba::deadline_timer> TimerPtr;
    TimerPtr        queryTimer;
    bool            queryCycleWorking;

    //[other definitions]
public:

ERes DeviceWorker::Connect()
{
    ERes intRes = sockWorker.Connect(/*[connect settings here]*/);

    if(intRes != LGS_RESULT_OK) {
        //[set result to error]
    }
    else {
        //[set result to success]

        //start "query cycle"
        StartNewCycleQuery();
    }

    return intRes;
}
//---------------------------------------------------------------------------
ERes DeviceWorker::Disconnect()
{
    return sockWorker.Disconnect();
}
//---------------------------------------------------------------------------
void DeviceWorker::StartNewCycleQuery()
{
    queryCycleWorking = true;
    //start timer
    queryTimer = make_shared<ba::deadline_timer>(timerIOService, bt::milliseconds(LW_QUERY_TIME));
    queryTimer->async_wait(boost::bind(&DeviceWorker::HandleQueryTimer,
                                       this, boost::asio::placeholders::error));
}
//---------------------------------------------------------------------------
void DeviceWorker::StopCycleQuery()
{
    //kill timer
    if (queryTimer) 
        queryTimer->cancel();

    queryCycleWorking = false;
}
//---------------------------------------------------------------------------
//timer handler
void DeviceWorker::HandleQueryTimer(const b::system::error_code& error)
{
    if (!error)
    {
        ProtoCmd cmd;    
        //query for first value
        cmd.EncodeCommandCore(lw_service_proto::cmdGetAlarm, 1);
        sockWorker.QueryCommand(cmd, boost::bind(&DeviceWorker::ReceiveAlarmCycle, 
                                this, _1));    
    }
}
//---------------------------------------------------------------------------
//receive first value
void DeviceWorker::ReceiveAlarmCycle(ProtoAnswer adata)
{
    //check and fix last bytes (remove \r\n from some commands)
    adata.CheckAndFixFooter();

    //[working with answer]

    if (queryCycleWorking)
    { 
        //query for second value
        ProtoCmd cmd;
        cmd.EncodeCommandCore(lw_service_proto::cmdGetEnergyLevel, 1);
        sockWorker.QueryCommand(cmd, b::bind(&DeviceWorker::ReceiveEnergyCycle, 
                                      this, _1));
    }
}
//---------------------------------------------------------------------------
//receive second value
void DeviceWorker::ReceiveEnergyCycle(ProtoAnswer edata)
{
    //check and fix last bytes (remove \r\n from some commands)
    edata.CheckAndFixFooter();

    //[working with second value]

    //start new "query cycle"
    if (queryCycleWorking)
        StartNewCycleQuery();
}

};

欢迎任何想法:)

编辑: 经过几次测试,我看到了另一张图片:

  • 此问题仅在 boost 1.54 上重现(调试和发布模式,发布 - 更快),boost 1.53 没有更多错误(也许我清理代码不佳,然后第一次重建......)
  • 使用 boost 1.54 和 1 个线程(而不是 4 个)都可以正常工作

我还花了一些时间在调试器和增强源上,并得出一些结论:

  • 当我收到 EOF 时,我的数据已经完全收到。
  • 这个 EOF 表示在这个操作中没有要传输的内容,即套接字结果标志为 0(没有错误),但是如果 EOF(传输字节 == 0)则提升操作标志

此时我被迫开启 boost 1.53 ......

4

2 回答 2

0

我遇到了完全相同的问题,我很确定这是 boost::asio 1.54.0 的错误

是错误报告。

解决方案是有效地回到 1.53,尽管在错误报告页面中有一个适用于 1.54 的补丁。

于 2013-09-06T14:37:36.290 回答
0

如果您的应用程序在单个线程调用时工作正常,io_service::run()但在四个线程时失败,那么您很可能遇到了竞态条件。这种类型的问题很难诊断。一般来说你应该保证你devSocket最多有一个优秀async_read()async_write()操作。您当前的SocketWorker::QueryCommand()无条件调用实现async_write()可能违反记录的排序假设

该操作是根据对流async_write_some函数的零次或多次调用来实现的,称为组合操作。程序必须确保流不执行其他写入操作(例如async_write,流的async_write_some 函数,或执行写入的任何其他组合操作),直到此操作完成。

这个问题的经典解决方案是维护一个传出消息队列。如果先前的写入未完成,则将下一条传出消息附加到队列中。当上一次写入完成时,async_write()为队列中的下一条消息启动。使用多个线程调用io_service::run()时,您可能需要像链接的答案那样使用链。

于 2013-07-19T18:01:58.880 回答