7

我想创建一个自治线程,专门用于使用 boost 库(asio)从 UDP 套接字接收数据。这个线程应该是由UDP socket接收到的一些数据触发的无限循环。在我的应用程序中,我需要使用异步接收操作。

如果我使用同步函数 receive_from 一切都按预期工作。

但是,如果我使用 async_receive_from,则永远不会调用处理程序。由于我使用信号量来检测已接收到一些数据,因此程序锁定并且永远不会触发循环。

我已经验证(使用网络分析器)发送方设备在 UDP 套接字上正确发送数据。

我已在以下代码中隔离了问题。

#include <boost\array.hpp>
#include <boost\asio.hpp>
#include <boost\thread.hpp>
#include <boost\interprocess\sync\interprocess_semaphore.hpp>

#include <iostream>

typedef boost::interprocess::interprocess_semaphore Semaphore;

using namespace boost::asio::ip;

class ReceiveUDP
{
public:

    boost::thread*  m_pThread;

    boost::asio::io_service         m_io_service;
    udp::endpoint                   m_local_endpoint;
    udp::endpoint                   m_sender_endpoint;

    udp::socket                     m_socket;

    size_t      m_read_bytes;
    Semaphore   m_receive_semaphore;

    ReceiveUDP() :
        m_socket(m_io_service),
        m_local_endpoint(boost::asio::ip::address::from_string("192.168.0.254"), 11),
        m_sender_endpoint(boost::asio::ip::address::from_string("192.168.0.11"), 5550),
        m_receive_semaphore(0)
    {
        Start();
    }

    void Start()
    {
        m_pThread = new boost::thread(&ReceiveUDP::_ThreadFunction, this);
    }

    void _HandleReceiveFrom(
        const boost::system::error_code& error,
        size_t                                  received_bytes)
    {
        m_receive_semaphore.post();

        m_read_bytes = received_bytes;
    }

    void _ThreadFunction()
    {
        try
        {
            boost::array<char, 100> recv_buf;

            m_socket.open(udp::v4());
            m_socket.bind(m_local_endpoint);
            m_io_service.run();

            while (1)
            {
#if 1 // THIS WORKS

                m_read_bytes = m_socket.receive_from(
                    boost::asio::buffer(recv_buf), m_sender_endpoint);

#else // THIS DOESN'T WORK

                m_socket.async_receive_from(
                    boost::asio::buffer(recv_buf),
                    m_sender_endpoint,
                    boost::bind(&ReceiveUDP::_HandleReceiveFrom, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));

                /* The program locks on this wait since _HandleReceiveFrom
                is never called. */
                m_receive_semaphore.wait();

#endif

                std::cout.write(recv_buf.data(), m_read_bytes);
            }

            m_socket.close();
        }
        catch (std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
    }
};

void main()
{
    ReceiveUDP  receive_thread;

    receive_thread.m_pThread->join();
}

信号量上的 timed_wait 是首选,但是出于调试目的,我在上面的代码中使用了阻塞等待。

我错过了什么?我的错误在哪里?

4

2 回答 2

10

您的呼叫io_service.run()正在退出,因为没有工作io_service要做。然后代码进入while循环并调用m_socket.async_receive_from. 此时io_service它没有运行,因此它永远不会读取数据并调用您的处理程序。

您需要在调用 io_service 运行之前安排工作:

IE:

// Configure io service
ReceiveUDP  receiver;

m_socket.open(udp::v4());
m_socket.bind(m_local_endpoint);
m_socket.async_receive_from(
    boost::asio::buffer(recv_buf),
    m_sender_endpoint,
    boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred));

处理函数将执行以下操作:

// start the io service
void HandleReceiveFrom(
    const boost::system::error_code& error,
    size_t received_bytes)
{
    m_receive_semaphore.post();

    // schedule the next asynchronous read
    m_socket.async_receive_from(
        boost::asio::buffer(recv_buf),
        m_sender_endpoint,
        boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver,
        boost::asio::placeholders::error,
        boost::asio::placeholders::bytes_transferred));

    m_read_bytes = received_bytes;
}

然后,您的线程只需等待信号量:

while (1)
{
    m_receive_semaphore.wait();
    std::cout.write(recv_buf.data(), m_read_bytes);
}

笔记:

  1. 你真的需要这个额外的线程吗?处理程序是完全异步的,boost::asio 可用于管理线程池(参见:think-async
  2. 请不要在变量/函数名称中使用下划线后跟大写字母。它们是保留的。
于 2012-11-27T09:46:42.990 回答
0

m_io_service.run()立即返回,因此没有人调度完成处理程序。请注意,这io_service::run是一种基于 asio 的应用程序的“消息循环”,只要您希望 asio 功能可用,它就应该运行(这有点简化了描述,但对于您的情况来说已经足够了)。

此外,您不应在循环中调用 async.operation。相反,在前一个的完成处理程序中发出后续的 async.operation - 以确保 2 个 async.reads 不会同时运行。

查看 asio 示例以了解典型的 asio 应用程序设计。

于 2012-11-27T09:50:24.687 回答