4

我正在使用教程构建一个没有可写部分的 java nio 服务器。

一切正常,除了一件有趣的事情:

  • 当客户端发送数据包太快时,服务器不会收到所有消息,服务器总是收到第一个和第二个数据包,但不会更多。
  • 如果客户端缓慢发送数据包,则服务器会获取所有数据包。

任何想法?

我正在添加服务器类代码,如果您需要下面代码中提到的另一个类,我在这里:)。

NIOServer 类:

package server;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

import javax.xml.parsers.ParserConfigurationException;

import org.xml.sax.SAXException;

public class NioServer implements Runnable {



// The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  //the cach will hundle the messages that came
  private Cache cache;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException {
    this.cache = cache;
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }


  private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
      }

  private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        socketChannel.register(this.selector, SelectionKey.OP_READ);
      }

  private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
          numRead = socketChannel.read(this.readBuffer);
          String test = new String(this.readBuffer.array());
          System.out.print(test);

        } catch (IOException e) {
          // The remote forcibly closed the connection, cancel
          // the selection key and close the channel.
        //  key.cancel();
        //  socketChannel.close();
          return;
        }

        if (numRead == -1) {
          // Remote entity shut the socket down cleanly. Do the
          // same from our end and cancel the channel.
          key.channel().close();
          key.cancel();
          return;
        }

        // Hand the data off to our worker thread
        this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
      }

  public void run() {
        while (true) {
          try {
            // Wait for an event one of the registered channels

            this.selector.select();



            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
              SelectionKey key = (SelectionKey) selectedKeys.next();
              selectedKeys.remove();

              if (!key.isValid()) {
                continue;
              }

              // Check what event is available and deal with it
              if (key.isAcceptable()) {
                this.accept(key);
              } else if (key.isReadable()) {
                this.read(key);
              }
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }

  public static void main(String[] args) throws ParserConfigurationException, SAXException {
    try {
        Cache cache = new Cache();
        new Thread(cache).start();
      new Thread(new NioServer(null, 9090,cache)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
4

1 回答 1

0

如果您正在阅读 UDP,我希望如此。请注意您在该read方法上处理数据包的速度有多慢。您正在将它们打印到 system.out,这非常慢,而且不确定您能够以多快的速度将数据处理到该processData方法的另一个线程。如果这是您滞后的根源,我编写的这个库可以帮助您进行线程间非阻塞通信。您还应该检查底层读取套接字缓冲区的大小。它越大,在数据包开始被丢弃之前,您必须有更多的空间快速赶上。对于 TCP,如果底层套接字缓冲区已满,您可能会在通道上收到 IOException。对于 UDP,数据包被静默丢弃。

要访问底层读取套接字缓冲区大小,您可以执行以下操作:

final Socket socket = channel.socket();
System.out.println(socket.getReceiveBufferSize());
socket.setReceiveBufferSize(newSize);

注意:AFAIK,Linux 可能需要一些操作系统配置才能更改底层缓冲区大小。如果setReceiveBufferSize没有效果(再次阅读以查看是否已更改),请在 Google 上搜索。:)

于 2012-11-16T01:36:55.503 回答