1

我必须实现一个多播接收器,它能够加入多播组列表并使用 boost 在特定线程中处理接收到的数据。我确实尝试了以下代码......

boost::asio::io_service m_io_service;
boost::asio::ip::udp::socket m_multicast_socket(m_io_service);

// listen address
boost::asio::ip::address listen_address  
     = boost::asio::ip::address::from_string("0.0.0.0");

// listen port
unsigned short multicast_port = m_configuration->m_multicast_interface_port;

boost::asio::ip::udp::endpoint listen_endpoint( listen_address, multicast_port );

// open socket
m_multicast_socket.open( listen_endpoint.protocol() );

// set socket buffer size
m_multicast_socket.set_option( 
       boost::asio::ip::udp::socket::receive_buffer_size
               ( m_configuration->m_receiving_socket_buffer_size ) );

// other sockets could bind to listen_address
m_multicast_socket.set_option( boost::asio::ip::udp::socket::reuse_address(true) );

boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);

m_multicast_socket.io_control(num_of_bytes_readable);

m_multicast_socket.bind(listen_endpoint);


// joining a list of multicast group
for ( size_t i=0; i < multicast_groups.size(); ++i )
{
    boost::asio::ip::address multicast_address 
         = boost::asio::ip::address::from_string( multicast_groups[i] );

    m_multicast_socket.set_option( 
        boost::asio::ip::multicast::join_group(
            multicast_address ) );

    std::cout << multicast_groups[i] << " multicast group joined!" << std::endl;
}

然后无限循环读取数据......

while ( !m_exit )
{
    while ( !num_of_bytes_readable.get() )
    {
        boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
    }

    boost::asio::ip::udp::endpoint sender_endpoint;

    size_t bytes_received = m_multicast_socket.receive_from(
        boost::asio::buffer( m_reading_buffer.get(), m_configuration->m_reading_buffer_size )
            , sender_endpoint );

    if ( bytes_received > 0 )
    {
       // process
    }

    boost::this_thread::yield();
}

但是没有接收到数据并且循环......

while ( !num_of_bytes_readable.get() )
{
    boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
}

永远不会退出。

我也尝试了 boost asio 文档中的多播接收器示例的代码,但async_recv_from永远不会返回。

4

1 回答 1

10

需要进行一些更改:

  • 正如评论中所建议的,每个多播组使用一个套接字。
  • 如果发送方和接收方在同一台机器上,则验证该ip::multicast::enable_loopback选项是否为真。
  • socket_base::bytes_readable是一个 IO 控制命令。 socket.io_control在套接字上执行命令,并bytes_readable.get()返回命令的值。因此,每次都需要执行该命令来查询有多少字节可供读取。另一种更易读的解决方案是使用该socket.available()函数。

下面是一个简单的例子,演示socket_base::bytes_readable.

// Standard includes.
#include <iostream>

// 3rd party includes.
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>

void read(boost::asio::ip::udp::socket& socket)
{
  boost::asio::ip::udp::endpoint sender;
  std::vector<char> buffer;
  std::size_t bytes_readable = 0;
  for (int i=0; i < 3; ++i)
  {
    // Poll until data is available.
    while (!bytes_readable)
    {
      // Issue command to socket to get number of bytes readable.
      boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);
      socket.io_control(num_of_bytes_readable);

      // Get the value from the command.
      bytes_readable = num_of_bytes_readable.get();

      // If there is no data available, then sleep.
      if (!bytes_readable)
      {
        boost::this_thread::sleep(boost::posix_time::seconds(1));
      }
    }

    // Resize the buffer to store all available data.
    buffer.resize(bytes_readable);

    // Read available data.
    socket.receive_from(
      boost::asio::buffer(buffer, bytes_readable),
      sender);

    // Extract data from the buffer.
    std::string message(buffer.begin(), buffer.end());

    // Output data.
    std::cout << "Received message: ";
    std::cout << message << std::endl;
  }
}

void write(boost::asio::ip::udp::socket& socket,
           boost::asio::ip::udp::endpoint& destination)
{
  std::string message;
  for (unsigned int i=0; i < 3; ++i)
  {
    std::ostringstream stream;
    stream << i;
    message = stream.str();
    socket.send_to(boost::asio::buffer(message), destination);
    std::cout << "Sent message: " << message << std::endl;
  }
}

int main(int argc, char* argv[])
{
  // Extract command-line arguments.
  bool receiver = std::string(argv[1]) == "receive";
  boost::asio::ip::address address =
    boost::asio::ip::address::from_string(argv[2]);
  unsigned short port = boost::lexical_cast<unsigned short>(argv[3]);

  // Create socket.
  using boost::asio::ip::udp;
  boost::asio::io_service service;
  udp::socket socket(service);
  socket.open(boost::asio::ip::udp::v4());

  // Allow other processes to reuse the address, permitting other processes on
  // the same machine to use the multicast address.
  socket.set_option(udp::socket::reuse_address(true));

  // Guarantee the loopback is enabled so that multiple processes on the same
  // machine can receive data that originates from the same socket.
  socket.set_option(boost::asio::ip::multicast::enable_loopback(true));
  socket.bind(
    udp::endpoint(boost::asio::ip::address_v4::any(),
    receiver ? port /* same as multicast port */
             : 0 /* any */));
  udp::endpoint destination(address, port);

  // Join group.
  namespace ip = boost::asio::ip;
  socket.set_option(ip::multicast::join_group(address));

  // Start read or write loops based on command line options.
  if (receiver) read(socket);
  else          write(socket, destination);
}

用法:

$ ./a.out receive 235.55.55.55 55555 &
$ sleep 1
$ ./a.out send 235.55.55.55 55555
Sent message: 0
Sent message: 1
Sent message: 2
Received message: 0
Received message: 1
Received message: 2
于 2012-10-05T15:46:53.817 回答