2

我正在编写 NIO2 服务器,我需要在 AsynchronousSocketChannel 上进行异步读取操作,这些操作中的每一个都包括读取一个整数,并进一步从相同的通道读取等于该整数的字节数。问题是,当我将两个或多个 CompletionHandler 连续放在通道上时(因为有多个读取操作的请求),并且这些处理程序中的第一个被触发,我在complete()第一个处理程序的方法中的进一步阅读代码无法正常工作,因为第二个当频道上有信息时,处理程序会立即被触发。我怎样才能阻止频道直到进一步阅读complete()没有Future事情完成?我不能使用 Future 因为我需要将处理程序放入套接字然后传递给其他任务。

for (final Map.Entry<String, AsynchronousSocketChannel> entry : ipSocketTable.entrySet()) {
        try {
            final AsynchronousSocketChannel outSocket = entry.getValue();
            synchronized (outSocket) {
                final ByteBuffer buf = ByteBuffer.allocateDirect(9);
                outSocket.read(buf, null, new DataServerResponseHandler(buf, outSocket, resultTable, server, entry.getKey()));
            }

        } catch (Exception e) {

        }
    }

这是 DataServerResponseHandler 类:

class DataServerResponseHandler implements CompletionHandler<Integer, Void> {

    private ConcurrentHashMap<String, Boolean> resultTable = null;
    private AsynchronousSocketChannel channel = null;
    private TcpServer server;
    private String ip;
    private ByteBuffer msg;

    public DataServerResponseHandler(ByteBuffer msg, AsynchronousSocketChannel channel,
            ConcurrentHashMap<String, Boolean> resultTable, TcpServer server, String ip) {
        this.msg = msg;
        this.channel = channel;
        this.resultTable = resultTable;
        this.server = server;
        this.ip = ip;
    }

    @Override
    public void completed(Integer result, Void attachment) {
            try {
                msg.rewind();
                int resultCode = msg.get() & 0xff;
                int ipOne = msg.get() & 0xff;
                int ipTwo = msg.get() & 0xff;
                int ipThree = msg.get() & 0xff;
                int ipFour = msg.get() & 0xff;
                int length = msg.getInt();
                msg.rewind();
                ByteBuffer buf = ByteBuffer.allocateDirect(length);
                channel.read(buf).get();
                buf.rewind();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    @Override
    public void failed(Throwable exc, Void attachment) {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

}
4

1 回答 1

1

这段代码有几个问题。
第一次读取不保证它会读取所有剩余的字节,但只要读取至少一个字节就会调用完成处理程序。因此,您必须检查缓冲区的位置并重新调用读取,直到您有 9 个字节的标头或有效负载的长度。

if (msg.position() < 9) {
    channel.read(msg, null, this);
    return;
}

对于第二部分,为了继续异步方法,再次运行带有完成处理程序的读取。您可以创建一个专门处理有效负载的新的或重用现有的,并且您必须记住状态:

switch (state) {
case READ_HEADER:
    if (msg.remaining() > 0) {
        channel.read(msg, null, this);
        return;
    }
    // do the parsing the IP and length
    state = READ_PAYLOAD;
    channel.read(payloadBuf, null, this);
    break;

case READ_PAYLOAD:
    if (payloadBuf.remaining() > 0) {
        channel.read(payloadBuf, null, this);
        return;
    }
    payloadBuf.flip();
    // get content from payloadBuf
    break;
}
于 2015-10-09T13:57:57.100 回答