1

我有 2 个客户端套接字绑定在同一个端口上侦听。

Selector readSelector = Selector.open();
DeviceUDPReader reader = new DeviceUDPReader();
reader.start();      

//The following is called for each of my 2 servers
void openDevice(String hostName, int hostPort, DriverHandler driverHandler) {
  socketAddress = new InetSocketAddress(hostName, hostPort);    
  writeSelector = Selector.open();
  channel = DatagramChannel.open();
  channel.configureBlocking(false);
  channel.socket().setReuseAddress(true);//Wonderful!! Many servers can respond to same port
  channel.socket().bind(new InetSocketAddress(localPort));// Controller sends data here
  channel.connect(socketAddress);
  channel.register(writeSelector, SelectionKey.OP_WRITE);
  registerChannel(channel, driverHandler);
}

//Register the channels with the read selector
void registerChannel(DatagramChannel channel, DriverHandler driverHandler) throws Exception {
  synchronized(pendingChanges) { 
    //Pending changes a linked list keeping track of changes to be made to Selector
    //ChangeRequest is a simple storage object
    pendingChanges.add(new ChangeRequest(channel, ChangeRequest.REGISTER, driverHandler));
  }
  readSelector.wakeup();
}

而选择器的运行方法如下

class DeviceUDPReader extends Thread {
DriverHandler driverHandler = null;
@Override
public void run() {
  int len;
  byte[] bb = {};
  ByteBuffer buffer = ByteBuffer.allocate(MAX_BYTES_PER_MESSAGE);
  while (true) {
    try {
      // Process any pending changes to selctor
      synchronized (pendingChanges) {
        Iterator<ChangeRequest> changes = pendingChanges.iterator();
        while (changes.hasNext()) {
          ChangeRequest change = (ChangeRequest) changes.next();
          switch (change.type) {
            case ChangeRequest.REGISTER:
              SelectionKey key = change.channel.register(readSelector, SelectionKey.OP_READ);                  
              key.attach(change.driverHandler);
              System.out.println("Registered channel" + change.channel.getRemoteAddress().toString());//both get registered
              break;
          }
        }
        pendingChanges.clear();
      }


      readSelector.select();//Blocks until at least one of the channels are ready
      System.err.println("TOTAL KEYS " + readSelector.keys().size());//==2

      Iterator<SelectionKey> it = readSelector.selectedKeys().iterator();
      System.out.println("Number of selected keys : " + readSelector.selectedKeys().size());//always 1
      while (it.hasNext()) {
        SelectionKey selKey = it.next();
        if (selKey.isValid() && selKey.isReadable()) {
          DatagramChannel sChannel = (DatagramChannel) selKey.channel();
          System.err.println("GOT SOMETHING FOR SELKEY " + sChannel.getRemoteAddress().toString());
          sChannel.receive(buffer);              
          buffer.flip();
          while ((len = buffer.remaining()) > 0) {                
            bb = new byte[len];
            buffer.get(bb, 0, len);
            System.err.println(new String(bb));
            driverHandler = ((DriverHandler)selKey.attachment());
            if (driverHandler != null) {
              driverHandler.readInput(new ByteArrayInputStream(bb, 0, len));
            }
          }
          buffer.clear();
        }
        it.remove();
      }
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  } // end while   
}

}

所以每个channel都有自己的write-write Selector,并共享一个read Selector。

我已经使用wireshark 进行了监控,发现数据在服务器A 和服务器B 之间正确发送和接收。我的读取选择器只从第一个选择器中获取消息,直到我完成并说channelA.close()。然后它立即获取从服务器 B 发送的数据。

任何想法我做错了什么?

4

0 回答 0