2

我正在尝试通过 boost::asio 发送原始数据,因为 boost::serialization 对我的需要来说太慢了。按照各种示例和提升文档,我有一个客户:

模拟客户端:

 void SimulationClient::sendData(std::vector<WaveformDefinition>waveformPackets) {
        socket.async_send_to(boost::asio::buffer(waveformPackets),
                receiver_endpoint,
                boost::bind(&ClientEnvironmentEngine::sendComplete, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
}

我在下面尝试了 Tanner Sansbury 的解决方案,但无法使其正常工作。但是,我使用以下方法取得了成功:

class WaveformReceiver {
     WaveformDefinition *buffer;

     WaveformReceiver(){
         buffer = new WaveformDefinition[MAX_WAVEFORMS];
         startReceive();
     }

     void startReceive() {
         socket_.async_receive_from(boost::asio::null_buffers(), remote_endpoint_,
               boost::bind(&WaveformReceiver::handleReceive, this,
               boost::asio::placeholders::error,
               boost::asio::placeholders::bytes_transferred));
     }

     void handleReceive(const boost::system::error_code& error,
        std::size_t size/*bytes_transferred*/)
     {
          if (!error)
          {
               int available = socket_.available();
               int numWaveforms = available / sizeof(WaveformDefinition_c);
               socket_.receive(boost::asio::buffer(buffer, available));

               //copy buffer into another buffer so we can re-use the original buffer for the next read
               WaveformDefinition_c* tempBuffer = new WaveformDefinition_c[numWaveforms];
               std::memcpy ( tempBuffer, buffer, available );

               //schedule a thread to handle the array of waveforms that we copied
               threadPool.schedule( boost::bind( handleWaveforms, tempBuffer, numWaveforms));
               //start listening for more waveforms
               startReceive();
          }
     }
}

Tanner 或其他人,你能告诉我我正在做的事情是否也应该起作用,或者我是否很幸运它目前正在工作?

4

2 回答 2

3

问题的基本部分是关于序列化和反序列化集合。

在不控制服务器和客户端的编译器和体系结构的情况下,发送原始结构通常是不安全的,因为系统之间的字节表示可能不同。虽然在这种特定情况下编译器和架构是相同的,但它们#pragma pack(1)并不相关,因为WAVEFORM_DATA_STRUCT它没有作为原始内存写入套接字。write相反,为收集操作提供了多个内存缓冲区。

boost::array<boost::asio::mutable_buffer,2> buffer = {{
  boost::asio::buffer(&waveformPacket->numWaveforms, ...), // &numWaveforms
  boost::asio::buffer(waveformPacket->waveforms)           // &waveforms[0]
}};

有各种工具可以帮助序列化数据结构,例如Protocol Buffers


下面的代码将演示序列化网络通信数据结构的基础知识。为了简化代码和解释,我选择专注于序列化和反序列化,而不是从套接字写入和读取。本节下方的另一个示例将展示更多原始方法,它假定相同的编译器和体系结构。

从基本foo类型开始:

struct foo
{
  char a;
  char b;
  boost::uint16_t c;
};

可以确定数据可以打包成总共4个字节。以下是一种可能的电线表示:

0        8       16       24       32
|--------+--------+--------+--------|
|   a    |   b    |        c        |
'--------+--------+--------+--------'

确定线路表示后,可以使用两个函数将foo对象序列化(保存)到缓冲区,另一个函数可用于foo从缓冲区反序列化(加载)。由于foo.c大于一个字节,函数还需要考虑字节序。我选择在 Boost.Asio 详细命名空间中使用字节序字节交换函数来实现某些平台中立性。

/// @brief Serialize foo into a network-byte-order buffer.
void serialize(const foo& foo, unsigned char* buffer)
{
  buffer[0] = foo.a;
  buffer[1] = foo.b;

  // Handle endianness.
  using ::boost::asio::detail::socket_ops::host_to_network_short;
  boost::uint16_t c = host_to_network_short(foo.c);
  std::memcpy(&buffer[2], &c, sizeof c);
}

/// @brief Deserialize foo from a network-byte-order buffer.
void deserialize(foo& foo, const unsigned char* buffer)
{
  foo.a = buffer[0];
  foo.b = buffer[1];

  // Handle endianness.
  using ::boost::asio::detail::socket_ops::network_to_host_short;
  boost::uint16_t c;
  std::memcpy(&c, &buffer[2], sizeof c);
  foo.c = network_to_host_short(c);
}

为 完成序列化和反序列化后foo,下一步是处理foo对象集合。在编写代码之前,需要确定线路表示。在这种情况下,我决定foo用 32 位计数字段作为元素序列的前缀。

0        8       16       24       32
|--------+--------+--------+--------|
|       count of foo elements [n]   |
|--------+--------+--------+--------|
|         serialized foo [0]        |
|--------+--------+--------+--------|
|         serialized foo [1]        |
|--------+--------+--------+--------|
|                ...                |
|--------+--------+--------+--------|
|         serialized foo [n-1]      |
'--------+--------+--------+--------'

再一次,可以引入两个辅助函数来序列化和反序列化foo对象集合,并且还需要考虑计数字段的字节顺序。

/// @brief Serialize a collection of foos into a network-byte-order buffer.
template <typename Foos>
std::vector<unsigned char> serialize(const Foos& foos)
{
  boost::uint32_t count = foos.size();

  // Allocate a buffer large enough to store:
  //   - Count of foo elements.
  //   - Each serialized foo object.
  std::vector<unsigned char> buffer(
      sizeof count +            // count
      foo_packed_size * count); // serialize foo objects

  // Handle endianness for size.
  using ::boost::asio::detail::socket_ops::host_to_network_long;
  count = host_to_network_long(count);

  // Pack size into buffer.
  unsigned char* current = &buffer[0];
  std::memcpy(current, &count, sizeof count);
  current += sizeof count; // Adjust position.

  // Pack each foo into the buffer.
  BOOST_FOREACH(const foo& foo, foos)
  {
    serialize(foo, current);
    current += foo_packed_size; // Adjust position.
  }

  return buffer;
};

/// @brief Deserialize a buffer into a collection of foo objects.
std::vector<foo> deserialize(const std::vector<unsigned char>& buffer)
{
  const unsigned char* current = &buffer[0];

  // Extract the count of elements from the buffer.
  boost::uint32_t count;
  std::memcpy(&count, current, sizeof count);
  current += sizeof count;

  // Handle endianness.
  using ::boost::asio::detail::socket_ops::network_to_host_long;
  count = network_to_host_long(count);

  // With the count extracted, create the appropriate sized collection.
  std::vector<foo> foos(count);

  // Deserialize each foo from the buffer.
  BOOST_FOREACH(foo& foo, foos)
  {
    deserialize(foo, current);
    current += foo_packed_size;
  }

  return foos;
};

这是完整的示例代码:

#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/asio/detail/socket_ops.hpp> // endian functions
#include <boost/cstdint.hpp>
#include <boost/foreach.hpp>
#include <boost/tuple/tuple.hpp>            // boost::tie
#include <boost/tuple/tuple_comparison.hpp> // operator== for boost::tuple

/// @brief Mockup type.
struct foo
{
  char a;
  char b;
  boost::uint16_t c;
};

/// @brief Equality check for foo objects.
bool operator==(const foo& lhs, const foo& rhs)
{
  return boost::tie(lhs.a, lhs.b, lhs.c) ==
         boost::tie(rhs.a, rhs.b, rhs.c);
}

/// @brief Calculated byte packed size for foo.
///
/// @note char + char + uint16 = 1 + 1 + 2 = 4
static const std::size_t foo_packed_size = 4;

/// @brief Serialize foo into a network-byte-order buffer.
///
/// @detail Data is packed as follows:
///
///   0        8       16       24       32
///   |--------+--------+--------+--------|
///   |   a    |   b    |        c        |
///   '--------+--------+--------+--------'
void serialize(const foo& foo, unsigned char* buffer)
{
  buffer[0] = foo.a;
  buffer[1] = foo.b;

  // Handle endianness.
  using ::boost::asio::detail::socket_ops::host_to_network_short;
  boost::uint16_t c = host_to_network_short(foo.c);
  std::memcpy(&buffer[2], &c, sizeof c);
}

/// @brief Deserialize foo from a network-byte-order buffer.
void deserialize(foo& foo, const unsigned char* buffer)
{
  foo.a = buffer[0];
  foo.b = buffer[1];

  // Handle endianness.
  using ::boost::asio::detail::socket_ops::network_to_host_short;
  boost::uint16_t c;
  std::memcpy(&c, &buffer[2], sizeof c);
  foo.c = network_to_host_short(c);
}

/// @brief Serialize a collection of foos into a network-byte-order buffer.
///
/// @detail Data is packed as follows:
///
///   0        8       16       24       32
///   |--------+--------+--------+--------|
///   |       count of foo elements [n]   |
///   |--------+--------+--------+--------|
///   |         serialized foo [0]        |
///   |--------+--------+--------+--------|
///   |         serialized foo [1]        |
///   |--------+--------+--------+--------|
///   |                ...                |
///   |--------+--------+--------+--------|
///   |         serialized foo [n-1]      |
///   '--------+--------+--------+--------'
template <typename Foos>
std::vector<unsigned char> serialize(const Foos& foos)
{
  boost::uint32_t count = foos.size();

  // Allocate a buffer large enough to store:
  //   - Count of foo elements.
  //   - Each serialized foo object.
  std::vector<unsigned char> buffer(
      sizeof count +            // count
      foo_packed_size * count); // serialize foo objects

  // Handle endianness for size.
  using ::boost::asio::detail::socket_ops::host_to_network_long;
  count = host_to_network_long(count);

  // Pack size into buffer.
  unsigned char* current = &buffer[0];
  std::memcpy(current, &count, sizeof count);
  current += sizeof count; // Adjust position.

  // Pack each foo into the buffer.
  BOOST_FOREACH(const foo& foo, foos)
  {
    serialize(foo, current);
    current += foo_packed_size; // Adjust position.
  }

  return buffer;
};

/// @brief Deserialize a buffer into a collection of foo objects.
std::vector<foo> deserialize(const std::vector<unsigned char>& buffer)
{
  const unsigned char* current = &buffer[0];

  // Extract the count of elements from the buffer.
  boost::uint32_t count;
  std::memcpy(&count, current, sizeof count);
  current += sizeof count;

  // Handle endianness.
  using ::boost::asio::detail::socket_ops::network_to_host_long;
  count = network_to_host_long(count);

  // With the count extracted, create the appropriate sized collection.
  std::vector<foo> foos(count);

  // Deserialize each foo from the buffer.
  BOOST_FOREACH(foo& foo, foos)
  {
    deserialize(foo, current);
    current += foo_packed_size;
  }

  return foos;
};

int main()
{
  // Create a collection of foo objects with pre populated data.
  std::vector<foo> foos_expected(5);
  char a = 'a',
       b = 'A';
  boost::uint16_t c = 100;

  // Populate each element. 
  BOOST_FOREACH(foo& foo, foos_expected)
  {
    foo.a = a++;
    foo.b = b++;
    foo.c = c++;
  }

  // Serialize the collection into a buffer.
  std::vector<unsigned char> buffer = serialize(foos_expected);

  // Deserialize the buffer back into a collection.
  std::vector<foo> foos_actual = deserialize(buffer);

  // Compare the two.
  std::cout << (foos_expected == foos_actual) << std::endl; // expect 1

  // Negative test.
  foos_expected[0].c = 0;
  std::cout << (foos_expected == foos_actual) << std::endl; // expect 0
}

这会产生 和 的预期1结果0


如果使用相同的编译器和架构,则可以将foo原始缓冲区中的连续对象序列重新解释为对象数组,并使用复制构造函数进行foo填充。std::vector<foo>例如:

// Create and populate a contiguous sequence of foo objects.
std::vector<foo> foo1;
populate(foo1);

// Get a handle to the contiguous memory block.
const char* buffer = reinterpret_cast<const char*>(&foo1[0]);

// Populate a new vector via iterator constructor.  
const foo* begin = reinterpret_cast<const foo*>(buffer);
std::vector<foo> foos2(begin, begin + foos1.size());

最后,foo1应该等于foo2。中的foo对象foo2将从.foo拥有的内存中重新解释的对象复制构造foo1

#include <iostream>
#include <vector>
#include <boost/cstdint.hpp>
#include <boost/foreach.hpp>
#include <boost/tuple/tuple.hpp>            // boost::tie
#include <boost/tuple/tuple_comparison.hpp> // operator== for boost::tuple

/// @brief Mockup type.
struct foo
{
  char a;
  char b;
  boost::uint16_t c;
};

/// @brief Equality check for foo objects.
bool operator==(const foo& lhs, const foo& rhs)
{
  return boost::tie(lhs.a, lhs.b, lhs.c) ==
         boost::tie(rhs.a, rhs.b, rhs.c);
}

int main()
{
  // Create a collection of foo objects with pre populated data.
  std::vector<foo> foos_expected(5);
  char a = 'a',
       b = 'A';
  boost::uint16_t c = 100;

  // Populate each element. 
  BOOST_FOREACH(foo& foo, foos_expected)
  {
    foo.a = a++;
    foo.b = b++;
    foo.c = c++;
  }

  // Treat the collection as a raw buffer.
  const char* buffer =
      reinterpret_cast<const char*>(&foos_expected[0]);

  // Populate a new vector.  
  const foo* begin = reinterpret_cast<const foo*>(buffer);
  std::vector<foo> foos_actual(begin, begin + foos_expected.size());

  // Compare the two.
  std::cout << (foos_expected == foos_actual) << std::endl; 

  // Negative test.
  foos_expected[0].c = 0;
  std::cout << (foos_expected == foos_actual) << std::endl;
}

与其他方法一样,这会产生 和 的预期1结果0

于 2013-09-26T18:30:17.967 回答
0

首先,它使用起来不安全pragma pack(1)。打包可能与不同的编译器/架构不同。此外,您将在协议更改方面遇到问题。我建议改用google protobuf

第二。您正在发送std::vector,但该向量的实际数据不在结构内部WAVEFORM_DATA_STRUCT(向量将其数据保存在堆中)。因此,您将向量及其指向堆的指针发送到另一台机器,该指针在哪里肯定是无效的。你需要以某种方式序列化你的向量。

PS 与 boost::asio 无关,这个问题是关于正确的序列化/反序列化。

于 2013-09-26T16:16:34.953 回答