我发现除了简单的情况外,NIO 的文档记录最多。即便如此,我已经完成了教程和几次重构,最终退回到最简单的情况,但我仍然偶尔会通过读取 0 字节的 SocketChannel 来触发 isReadable。并非每次执行都会发生。
我曾经在一个单独的线程中从附加对象调用读取,并认为这可能是竞争条件,但我已经开始在选择器的线程中进行读取,但问题仍然存在。我想它可能是我的测试客户端,但我不确定什么会不一致地触发它,因为客户端套接字在收到服务器的响应之前不应该关闭。
因此,在包含的代码中,此代码段发送的“hello”消息每次都能像我预期的那样正常
out.write("hello".getBytes()); out.write(EOT); out.flush();
在此之后,我偶尔会得到一个 0 长度的套接字通道。有时会从这个片段中得到正确的响应:
out.write(dataServerCredentials.getBytes()); out.write(EOT); out.flush();
对此的任何见解将不胜感激,它正在慢慢杀死我。我已经尝试在这里找到答案,但似乎相关的一个问题并没有真正说明我的问题。
提前致谢!
下面的代码片段:
选择方法:
public void execute()
{
initializeServerSocket();
for (;;)
{
try
{
System.out.println("Waiting for socket activity");
selector.select();
Iterator<SelectionKey> selectedKeys =
this.selector.selectedKeys().iterator();
while(selectedKeys.hasNext())
{
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid())
{
continue;
}
if (key.isAcceptable())
{ // New connection
// TODO: Create helper method for this that configures user info?
System.out.println("Accepting connection");
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel)key.channel();
SocketChannel socketChannel =
serverSocketChannel.accept();
socketChannel.socket().setSoTimeout(0);
socketChannel.configureBlocking(false);
SelectionKey newKey =
socketChannel.register(selector, SelectionKey.OP_READ);
// Create and attach an AuthProcessor to drive the states of this
// new Authentication request
newKey.attach(new AuthenticationRequestProcessor(newKey));
}
else if (key.isReadable())
{ // Socket has incoming communication
AuthenticationRequestProcessor authProcessor =
(AuthenticationRequestProcessor)key.attachment();
if (authProcessor == null)
{ // Cancel Key
key.channel().close();
key.cancel();
System.err.print("Cancelling Key - No Attachment");
}
else
{
if (authProcessor.getState() ==
AuthenticationRequestProcessor.TERMINATE_STATE)
{ // Cancel Key
key.channel().close();
key.cancel();
}
else
{ // Process new socket data
authProcessor.process(readStringFromKey(key));
}
}
}
}
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
读取方法(忽略这里的一些愚蠢,这是从另一个线程中拉出来的)
protected String readStringFromKey(SelectionKey key)
{
SocketChannel socketChannel = (SocketChannel)key.channel();
readBuffer.clear();
String message = null;
try
{
final int bytesRead = socketChannel.read(readBuffer);
if (-1 == bytesRead)
{ // Empty/Closed Channel
System.err.println("Error - No bytes to read on selected channel");
}
else
{ // Convert ByteBuffer into a String
System.out.println("Bytes Read: " + bytesRead);
readBuffer.flip();
message = byteBufferToString(readBuffer, bytesRead);
readBuffer.clear();
}
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
// Trim EOT off the end of the message
return message.trim();
}
客户端片段:
public void connect()
{
boolean connectionStatus = false;
String connectionHost = null;
int connectionPort = 0;
String connectionAuthKey = null;
try
{ // Login
authenticationSocket = new Socket(AUTH_HOST, AUTH_PORT);
out = authenticationSocket.getOutputStream();
in = new BufferedInputStream(authenticationSocket.getInputStream());
out.write("hello".getBytes());
out.write(EOT);
out.flush();
StringBuilder helloResponse = new StringBuilder();
// Read response off socket
int currentByte = in.read();
while (currentByte > -1 && currentByte != EOT)
{
helloResponse.append((char)currentByte);
currentByte = in.read();
}
outgoingResponses.offer(Plist.fromXml(helloResponse.toString()));
System.out.println("\n" + helloResponse.toString());
out.write(credentials.getBytes());
out.write(EOT);
out.flush();
// Read status
int byteRead;
StringBuilder command = new StringBuilder();
do
{
byteRead = in.read();
if (0 < byteRead)
{
if (EOT == byteRead)
{
Logger.logData(command.toString());
Map<String, Object> plist = Plist.fromXml(command.toString());
outgoingResponses.offer(plist);
// Connection info for Data Port
connectionStatus = (Boolean)plist.get(STATUS_KEY);
connectionHost = (String)plist.get(SERVER_KEY);
connectionPort = (Integer)plist.get(PORT_KEY);
connectionAuthKey = (String)plist.get(AUTH_KEY);
Logger.logData("Server =>" + plist.get("server"));
command = new StringBuilder();
}
else
{
command.append((char)byteRead);
}
}
}
while (EOT != byteRead);
}
catch (UnknownHostException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (XmlParseException e)
{
Logger.logData("Invalid Plist format");
e.printStackTrace();
}
finally
{ // Clean up handles
try
{
authenticationSocket.close();
out.close();
in.close();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Connection status =>" + connectionStatus);
System.out.println("Connection host =>" + connectionHost);
System.out.println("Connection port =>" + connectionPort);
if (connectionStatus)
{
dataServerHost = connectionHost;
dataServerPort = connectionPort;
dataServerAuthKey = connectionAuthKey;
System.out.println("Connecting to data server @: " + dataServerHost + ":" + dataServerPort);
connectToDataServer();
}
}