0

我正在尝试使用新的 Akka I/O 实现 Tcp 服务器,不幸的是文档还没有完成,我在 java 中实现它时遇到了一些问题 :(。我设法创建了一个客户端和服务器并发送从客户端到服务器的消息,但是您如何使用 ByteIterator 读取接收到的字节?我的方法有误吗?也许这不是您处理数据的方式。

当我非常快速地发送大量消息时会发生一些奇怪的事情,它们都排队到 bi,并且在我创建 it.getInt() 后数据永远不会重置事件

 if (msg instanceof Tcp.Received) {
   final Tcp.Received recv = (Tcp.Received) msg;
   final ByteString data = recv.data();
   ByteIterator bi = data.iterator();
   while(bi.hasNext()) {
      ....
   }
 } else if (msg instanceof Tcp.CommandFailed) {
   final Tcp.CommandFailed failed = (Tcp.CommandFailed) msg;
   final Tcp.Command command = failed.cmd();
   // react to failed connect, bind, write, etc.
 } else if (msg instanceof Tcp.ConnectionClosed) {
   final Tcp.ConnectionClosed closed = (Tcp.ConnectionClosed) msg;
   if (closed.isAborted()) {
      // handle close reasons like this
 }

解决方案:哦!现在我明白我的错误了,我在客户端缓存了 byteStringBuilder,却忘了清除它-_- 多么愚蠢!

对于那些想要Java实现示例的人来说,我是这样做的:

如果你们有任何优化我正在接受他们!:)

   int packetSize = 0;
   if (msg instanceof Tcp.Received) {
        final Tcp.Received recv = (Tcp.Received) msg;
        final ByteString data = recv.data();
        ByteIterator bi = data.iterator();
        while (bi.hasNext()) {
            packetSize = bi.getInt(ByteOrder.LITTLE_ENDIAN);
            Message m = Message.fromByteIterator(bi);
            getContext().parent().tell(m, null);
        }

    }

Message.fromByteIterator(bi) 将通过从 BI 获取 Ints、Floats、Bytes Arrays ... 来初始化一个新的消息对象

4

1 回答 1

0

我不完全确定我理解这个问题,但这里有两个问题可能导致了这个问题。

您收到的 ByteString 包含由 Tcp 传递的块。你得到的迭代器只会迭代那个块。它不是一个流,只是一个块。此外,Tcp 可以决定将您的字节流拆分到任何它想要的地方,因此您获得的块可能与您的消息边界不一致。你必须自己处理框架。

如果您发送太快并且网络跟不上,则可能会发生第二个问题。Akka IO 为此内置了流控制机制。最简单的情况是在每个 Write 上附加一个 Ack 对象,并且仅在前一个已被确认时才发送下一个写入。

于 2013-03-25T16:32:50.307 回答