8

When writing 2048bytes in on handler, the messageRevieved method should be called twice to receive the all data... how I can receive the 2048bytes data in

Code

Server:

public class Server{
    public static void main(String[] args){
        ChannelFactory factory=new NioServerSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
        ServerBootstrap bootstrap=new ServerBootstrap(factory);
        bootstrap.setPipelineFactory(new CarPipelineFactory());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress(8989));
    }
}

Server Handler:

public class ServerHandler extends SimpleChannelHandler{

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e){
        byte[] resp=data.getBytes();//data is a String greater than 1024bytes;
        ChannelBuffer buffer=ChannelBuffers.buffer(resp.length);
        buffer.writerBytes(resp);
        e.getChannel().write(buffer);
        buffer.clear();
    }
}

Client:

public class Client{
    public static void main(String[] args){
        ChannelFactory channelFactory=new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
        ClientBootstrap bootstrap=new ClientBootstrap(channelFactory);
        bootstrap.getPipeline().addLast("handler", new PhoneClientHandler());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.connect(new InetSocketAddress("127.0.0.1",8181));
    }
}

Client Handler:

public class ClientHandler extends SimpleChannelHandler{
    public void messageRecieved(ChannelHandlerContext ctx, ChannelStateEvent e){
        ChannelBuffer buffer=(ChannelBuffer)e.getMessage();
        int size=buffer.readableBytes();
        byte[] bytes=new byte[size];
        buffer.readBytes(bytes);
        buffer.clear();
        System.out.println(new String(bytes));//if the data size>1024,the String will speprate into parts.
    }
}
4

6 回答 6

6

好吧,您总是可以决定一次写入多少字节,但您绝对不知道何时接收到多少字节(这就是 NIO 有意义的原因)。您需要处理自己的缓冲区以接收所需的固定数量的字节。为此,您可以使用为此目的设计的FrameDecoder 。

此外,您可以通过将tcpNoDelay设置为 true来确保数据不会在发送方套接字缓冲区中停留太久,因此在物理发送数据之前不会等待当前“帧”达到某个临界大小。

如果我理解得很好,您一方面说 2048 字节,但另一方面在 messagedReceived 事件中未收到所有数据?尝试检查这些常见问题:

  • 您的应用程序终止过早且数据尚未到达
  • 您的数据卡在“发送方”的套接字缓冲区中,因为您没有关闭Channel 并且tcpNoDelay选项未设置为 true。这导致套接字在发送数据包之前等待一些额外的字节。
  • 您没有读取 ChannelBuffer 内的所有数据,但出于某种原因,将readerIndex设置为更远的位置

尝试向我们展示您的部分代码,它应该会让事情变得更容易......

添加于 2012 年 4 月 17 日

如果我了解您正在尝试将字符串的字节数组编码从发送者传递给接收者。这是您进行小重构后的代码:

- - - - - - - - - - - - - - 代码 - - - - - - - - - - - --------写手:response.size()>1024bytes

byte[] datas = ((String)msg).getBytes("UTF-8"); //ALWAYS SPECIFY THE ENCODING
ChannelBuffer buffer = ChannelBuffers.wrap(datas); //USE DIRECTLY THE ARRAY
System.out.println(buffer);    //buffer'size>1024 here
channel.write(buffer);

----------------------------receive hand: 应该收到两次,println() 会执行两次

ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); 
System.out.println(buffer)    //buffer'size once 1024,once the remainder size
byte[] datas =buffer.readBytes(buffer.readableBytes()).array()
String msg=new String(datas , "UTF-8"); //BAD IDEA because the bytes sequence of the last UTF-8 char could be uncompleted there
System.out.println(str);

这不是这样做的方法,您应该直接使用包org.jboss.netty.handler.codec.string中的StringEncoderStringDecoder。它将为您处理框架问题。如果你还想调试你的代码,请使用 Netty 提供的LoggingHandler。你也真的设置了这个选项:

bootstrap.setOption("tcpNoDelay", true);

在双方引导?

于 2012-04-16T12:30:12.263 回答
3

尝试在您TruncatedChannelBuffer的. 我认为它会起作用..或者如果它不起作用,请发布生成异常的堆栈跟踪。我在我的代码中尝试了这个并且它有效..我希望这会对你有所帮助。BigEndianHeapChannelBufferClientHandler

public void messageReceived(ChannelHandlerContext channelHandlerContext,MessageEvent messageEvent) throws Exception {

    Object messageObject = messageEvent.getMessage();

    // if size of message < 1024 then TruncatedChannelBuffer is returned.

    if (messageObject instanceof TruncatedChannelBuffer) {

        try {

            TruncatedChannelBuffer truncatedChannelBuffer = (TruncatedChannelBuffer) messageObject;

            byte[] byteArray = new byte[truncatedChannelBuffer.readableBytes()];

            truncatedChannelBuffer.readBytes(byteArray);

            System.out.print(" Message = "+new String(byteArray));

            truncatedChannelBuffer.clear();

        } catch (Exception e) {

            System.out.println("Exception in MessageReceived...");

            e.printStackTrace();


        }
    }
    // if size of message > 1024 then BigEndianHeapChannelBuffer is returned.

    if (messageObject instanceof BigEndianHeapChannelBuffer) {

        try {

            BigEndianHeapChannelBuffer bigEndianHeapChannelBuffer = (BigEndianHeapChannelBuffer) messageObject;

            byte[] byteArray  = new byte[bigEndianHeapChannelBuffer.readableBytes()];

            bigEndianHeapChannelBuffer.readBytes(byteArray);

            System.out.print(" Message = "+new String(byteArray));

            bigEndianHeapChannelBuffer.clear();


        } catch (Exception e) {

            System.out.println("Exception in MessageReceived...");

            e.printStackTrace();

        }
    }

}       
于 2012-07-07T08:07:28.850 回答
2

首先,对于客户端,引导选项不应以'child'开头:

bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);

此外,您不要在客户端和服务器上使用相同的端口!

其次,您没有“关闭”策略:假设您的客户何时知道其工作已完成?你如何防止线程过早结束?你应该做这个

服务器处理程序

public class ServerHandler extends SimpleChannelHandler{

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e){
        byte[] resp=data.getBytes();//data is a String greater than 1024bytes;
        ChannelBuffer buffer=ChannelBuffers.buffer(resp.length);
        buffer.writerBytes(resp);
        e.getChannel().write(buffer);
        buffer.clear();
        e.getChannel.close();
    }
}

客户端引导

public class Client{
    public static void main(String[] args){
        ChannelFactory channelFactory=new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool());
        ClientBootstrap bootstrap=new ClientBootstrap(channelFactory);
        bootstrap.getPipeline().addLast("handler", new PhoneClientHandler());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1",8181));

        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();

        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
    }
}

最后,您需要通过阅读大量示例来更好地理解您在做什么。它们可以在主捆绑下载的org.jboss.netty.example包中找到。

于 2012-04-18T09:53:35.337 回答
1

RenaudBlue@ 提出了很好的观点。此外,我建议切换到 Netty4,它使所有 ByteBufs 动态化,并且使分块读/写更易于管理。请参阅“移植客户端”

例如,

private void sendNumbers() {
  // Do not send more than 4096 numbers.
  boolean finished = false;
  MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
  while (out.size() < 4096) {
      if (i <= count) {
          out.add(Integer.valueOf(i));
          i ++;
      } else {
          finished = true;
          break;
      }
  }

  ChannelFuture f = ctx.flush();
  if (!finished) {
      f.addListener(numberSender);
  }
}

private final ChannelFutureListener numberSender = new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
      if (future.isSuccess()) {
          sendNumbers();
      }
  }
};

Netty4 还具有通道选项配置的类型安全性,这可以防止"child.tcpNoDelay"错误。

但 Netty4 的最大胜利是定义良好的线程模型,这使得 Netty易于使用。

于 2013-03-31T17:35:18.887 回答
1

我有同样的问题,尝试使用 Oio 而不是 Nio!(只需将“nio”更改为“oio”,将“Nio”更改为“Oio”。

http://lists.jboss.org/pipermail/netty-users/2009-June/000891.html

于 2013-01-15T11:59:38.000 回答
0

您需要按FixedRecvByteBufAllocator以下SocketChannel方式设置childHandler()

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2 * 1024));
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 5));
                pipeline.addLast(new StringEncoder());
                pipeline.addLast(new StringDecoder());
                ...
            }
        });
于 2018-01-04T07:37:49.430 回答