2

我正在尝试编写具有要求的非阻塞客户端和非阻塞服务器:

  • 服务器只是监听客户端并将收到的内容发回给他们
  • 客户端可以随时向服务器发送消息,并且可以多次发送,但只能通过一个 SocketChannel

我阅读了本教程:http ://rox-xmlrpc.sourceforge.net/niotut/index.html ,它实现了服务器,如下所示:

  1. 初始化选择器
  2. 等待来自客户端的可接受的 SelectionKey
  3. 然后等待一个可读的SelectionKey,并从中读取数据(socketChannel.read返回时-1,关闭socketChannel)
  4. 之后通过可写的 SelectionKey 发送回客户端

我还看了一下本教程:https ://forums.oracle.com/forums/thread.jspa?threadID=1145909&tstart=2040 ,但这对我来说太难理解了

所以我根据ROX的教程重写了自己的代码。这是我的代码http://www.mediafire.com?o30yvtp5kqpya8b(它几乎是基于 ROX 教程的代码)。因为这里很难贴出所有代码,所以我把我的项目上传到mediafire让你下载,请下载它,你可以将它导入到Eclipse中,方便查看代码。

您可以运行:MyServer.javain package server,然后MyClent.javain package client_test(不关心 package client

运行服务器和客户端后,您将看到服务器只能收到来自客户端的第一条消息,但应该收到 2 条消息。我知道我的工具有问题,但我不知道为什么以及如何修复它。

任何有关修复我的代码或关于我的要求的解决方案的建议将不胜感激,谢谢大家。

好的,我将在这里发布代码的相关部分:

我的客户:

我的客户端.java

public class MyClient implements Runnable {
    // The host:port combination to connect to
    private InetAddress hostAddress;
    private int port;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(8);

    // A list of PendingChange instances
    private List pendingChanges = new LinkedList();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map pendingData = new HashMap();

    // Maps a SocketChannel to a RspHandler
    private Map rspHandlers = Collections.synchronizedMap(new HashMap());


    private SocketChannel socket;
    private static MyResponseHandler handler;   

    public MyClient(InetAddress hostAddress, int port) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();

        handler = new MyResponseHandler();      
    }

    public void send(byte[] data, MyResponseHandler handler) throws IOException {
        System.out.println("------------ send() ---- BEGIN");

        // Register the response handler
        this.rspHandlers.put(socket, handler);  

        // And queue the data we want written
        synchronized (this.pendingData) {
            List queue = (List) this.pendingData.get(socket);
            if (queue == null) {
                queue = new ArrayList();
                this.pendingData.put(socket, queue);
            }
            queue.add(ByteBuffer.wrap(data));
        }

        // Finally, wake up our selecting thread so it can make the required changes
        this.selector.wakeup();
        System.out.println("------------ send() ---- END");
    }

    public void run() {
        while (true) {
            System.out.println("------------ while in run() ---- BEGIN");
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator changes = this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        System.out.println("CHANGE!!!!!!!!!!!!!!!!!");
                        ChangeRequest change = (ChangeRequest) changes.next();
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = change.socket.keyFor(this.selector);
                            key.interestOps(change.ops);
                            break;
                        case ChangeRequest.REGISTER:
                            change.socket.register(this.selector, change.ops);
                            break;
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();
                System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^");
                // Iterate over the set of keys for which events are available
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    System.out.println("There's something in this while loop");
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        System.out.println("key is invalid");
                        continue;
                    }

                    // Check what event is available and deal with it
                    if (key.isConnectable()) {
                        this.finishConnection(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println("------------ while in run() ---- END");
        }
    }

    private void read(SelectionKey key) throws IOException {
        System.out.println("------------ read() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            return;
        }

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            key.channel().close();
            key.cancel();       
            return;
        }

        // Handle the response
        this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
        System.out.println("------------ read() ---- END");
    }

    private void handleResponse(SocketChannel socketChannel, byte[] data, int numRead) throws IOException {
        System.out.println("------------ handleResponse() ---- BEGIN");
        // Make a correctly sized copy of the data before handing it
        // to the client
        byte[] rspData = new byte[numRead];
        System.arraycopy(data, 0, rspData, 0, numRead);

        // Look up the handler for this channel
        MyResponseHandler handler = (MyResponseHandler) this.rspHandlers.get(socketChannel);

        // And pass the response to it
        if (handler.handleResponse(rspData)) {
            // The handler has seen enough, close the connection
            socketChannel.close();
            socketChannel.keyFor(this.selector).cancel();
        }
        System.out.println("------------ handleResponse() ---- END");
    }

    private void write(SelectionKey key) throws IOException {
        System.out.println("------------ write() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List queue = (List) this.pendingData.get(socketChannel);

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }

            if (queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                key.interestOps(SelectionKey.OP_READ);
            }
        }
        System.out.println("------------ write() ---- END");
    }

    private void finishConnection(SelectionKey key) throws IOException {
        System.out.println("------------ finishConnection() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Finish the connection. If the connection operation failed
        // this will raise an IOException.
        try {
            socketChannel.finishConnect();
        } catch (IOException e) {
            // Cancel the channel's registration with our selector
            System.out.println(e);
            key.cancel();
            return;
        }

        // Register an interest in writing on this channel
        key.interestOps(SelectionKey.OP_WRITE);
        System.out.println("------------ finishConnection() ---- END");
    }

    private void initiateConnection() throws IOException {
        System.out.println("------------ initiateConnection() ---- BEGIN");
        // Create a non-blocking socket channel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

        // Kick off connection establishment
        socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));

        // Queue a channel registration since the caller is not the 
        // selecting thread. As part of the registration we'll register
        // an interest in connection events. These are raised when a channel
        // is ready to complete connection establishment.
        synchronized(this.pendingChanges) {
            this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
        }

        System.out.println("------------ initiateConnection() ---- END");
        socket = socketChannel;
    }

    private Selector initSelector() throws IOException {
        // Create a new selector
        return SelectorProvider.provider().openSelector();
    }

    public static void main(String[] args) {
        try {
            MyClient client = new MyClient(InetAddress.getByName("127.0.0.1"),
                    9090);
            Thread t = new Thread(client);
            t.setDaemon(true);
            t.start();

            // Start a new connection
            client.initiateConnection();            

            // 1st
            client.send("hehe|||".getBytes(), handler);
            System.out.println("SEND: " + "hehe|||");
            handler.waitForResponse();

            System.out.println("==========================================================");

            // 2nd
            client.send(("2 hehe|||").getBytes(), handler);
            System.out.println("SEND: " + "2 hehe|||");
            handler.waitForResponse();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MyResponseHandler.java

public class MyResponseHandler {
    private byte[] rsp = null;

    public synchronized boolean handleResponse(byte[] rsp) {
        this.rsp = rsp;
        this.notify();
        return true;
    }

    public synchronized void waitForResponse() {
        while(this.rsp == null) {
            try {
                System.out.println("--waiting...");
                this.wait();
                System.out.println("--done!!!");
            } catch (InterruptedException e) {}
        }

        System.out.println("RECEIVE: " + new String(this.rsp));

        /**
         *  Set @rsp = null to let the block inside the above while loop
         *  will be run again 
         */
        rsp = null;
    }
}

MyServer.java

public class MyServer implements Runnable {
    // CONSTANT
    private final static int BUFFER_SIZE = 8;

    // The host:port combination to listen on
    private InetAddress hostAddress;
    private int port;

    // The channel on which we'll accept connections
    private ServerSocketChannel serverChannel;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

    private RequestCollector requestCollector;

    // A list of PendingChange instances
    private List<ChangeRequest> pendingChanges = 
            new LinkedList<ChangeRequest>();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map<SocketChannel, List<ByteBuffer>> pendingData = 
            new HashMap<SocketChannel, List<ByteBuffer>>();

    public MyServer(InetAddress hostAddress, int port, 
            RequestCollector requestCollector) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();
        this.requestCollector = requestCollector;
    }

    public void send(SocketChannel socket, byte[] data) {
        System.out.println("------------ send() ---- BEGIN");
        synchronized (this.pendingChanges) {
            // Indicate we want the interest ops set changed
            this.pendingChanges.add(new ChangeRequest(socket,
                    ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

            // And queue the data we want written
            synchronized (this.pendingData) {
                List<ByteBuffer> queue = 
                        (List<ByteBuffer>) this.pendingData.get(socket);
                if (queue == null) {
                    queue = new ArrayList<ByteBuffer>();
                    this.pendingData.put(socket, queue);
                }
                queue.add(ByteBuffer.wrap(data));
            }
        }

        // Finally, wake up our selecting thread so it can make the required 
        // changes
        this.selector.wakeup();
        System.out.println("------------ send() ---- END");
    }

    public void run() {
        while (true) {
            System.out.println("------------ while in run() ---- BEGIN");
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator<ChangeRequest> changes = 
                            this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        System.out.println("CHANGE!!!!!!!!!!!!!!!!!");
                        ChangeRequest change = (ChangeRequest) changes.next();
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = 
                                change.socket.keyFor(this.selector);
                            key.interestOps(change.ops);
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();
                System.out.println("^^^^^^^^^^^^^^^^^^^^^^^^");

                // Iterate over the set of keys for which events are available
                Iterator<SelectionKey> selectedKeys = 
                        this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    System.out.println("There's something in this while loop");
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        System.out.println("key is invalid");
                        continue;
                    }

                    // Check what event is available and deal with it
                    if (key.isAcceptable()) {
                        this.accept(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("------------ while in run() ---- END");
        }
    }

    private void accept(SelectionKey key) throws IOException {
        System.out.println("------------ accept() ---- BEGIN");
        // For an accept to be pending the channel must be a server socket 
        // channel.
        ServerSocketChannel serverSocketChannel = 
                (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        //Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        SelectionKey readKey = 
                socketChannel.register(this.selector, SelectionKey.OP_READ);

        // Attach a StringBuilder to this SocketChannel
        readKey.attach( new StringBuilder() );

        // DEBUG    
        System.out.println(socketChannel.socket().getInetAddress() + " - "
                + socketChannel.socket().getPort());
        System.out.println("------------ accept() ---- END");
    }

    private void read(SelectionKey key) throws IOException {
        System.out.println("------------ read() ---- BEGIN");
        // Get socket channel
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Get attached StringBuilder
        StringBuilder currentMessage = (StringBuilder) key.attachment();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            // The remote forcibly closed the connection, cancel
            // the selection key and close the channel.
            key.cancel();
            socketChannel.close();
            return;
        }

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            key.cancel();
            return;
        }

        // Hand the data off to our requestCollector thread
        this.requestCollector.processData(this, socketChannel, 
                this.readBuffer.array(), numRead, currentMessage);
        System.out.println("------------ read() ---- END");
    }

    private void write(SelectionKey key) throws IOException {
        System.out.println("------------ write() ---- BEGIN");
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List<ByteBuffer> queue = 
                    (List<ByteBuffer>) this.pendingData.get(socketChannel);

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }

            if (queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                key.interestOps(SelectionKey.OP_READ);
            }
        }
        System.out.println("------------ write() ---- END");
    }

    private Selector initSelector() throws IOException {
        System.out.println("------------ initSelector() ---- BEGIN");
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, 
                this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        System.out.println("------------ initSelector() ---- END");
        return socketSelector;
    }

    public static void main(String[] args) {
        try {
            RequestCollector requestCollector = new RequestCollector();
            new Thread(requestCollector).start();
            new Thread(new MyServer(null, 9090, requestCollector)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

如果您想查看更多代码,可以下载 zip 文件。并且在运行服务器和客户端时,调试的结果是:

服务器

------------ initSelector() ---- BEGIN
------------ initSelector() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ accept() ---- BEGIN
/127.0.0.1 - 46553
------------ accept() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ read() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
RECEIVE: hehe|||
------------ send() ---- BEGIN
------------ send() ---- END
^^^^^^^^^^^^^^^^^^^^^^^^
------------ while in run() ---- END
------------ while in run() ---- BEGIN
SEND: hehe|||
CHANGE!!!!!!!!!!!!!!!!!
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ write() ---- BEGIN
------------ write() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ while in run() ---- END
------------ while in run() ---- BEGIN

客户

------------ initiateConnection() ---- BEGIN
------------ initiateConnection() ---- END
------------ send() ---- BEGIN
------------ send() ---- END
SEND: hehe|||
--waiting...
------------ while in run() ---- BEGIN
CHANGE!!!!!!!!!!!!!!!!!
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ finishConnection() ---- BEGIN
------------ finishConnection() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ write() ---- BEGIN
------------ write() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
There's something in this while loop
------------ read() ---- BEGIN
------------ handleResponse() ---- BEGIN
--done!!!
RECEIVE: hehe|||
==========================================================
------------ send() ---- BEGIN
------------ send() ---- END
SEND: 2 hehe|||
--waiting...
------------ handleResponse() ---- END
------------ read() ---- END
------------ while in run() ---- END
------------ while in run() ---- BEGIN
^^^^^^^^^^^^^^^^^^^^^^^^
------------ while in run() ---- END
------------ while in run() ---- BEGIN
4

2 回答 2

0
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;

public class EchoServer
{
    public static int DEFAULT_PORT=7;

    public static void main(String [] args)
    {

        ServerSocketChannel serverChannel;
        Selector selector;
        try
        {
            serverChannel = ServerSocketChannel.open();
            ServerSocket ss = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(DEFAULT_PORT);
            ss.bind(address);
            serverChannel.configureBlocking(false);
            selector=Selector.open();
            serverChannel.register(selector,SelectionKey.OP_ACCEPT);
        } catch(IOException ex) {ex.printStackTrace(); return;}


        while(true)
        {
            int selectednum=0;
            try{
                selectednum=selector.select();  //blocks
            }catch (IOException ex) {ex.printStackTrace(); break;}
            if (selectednum>0) {
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key=iterator.next();
                iterator.remove();
                try{

                    if (key.isValid()==false) {key.cancel(); key.channel().close(); continue; }

                    if (key.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("Accepted from "+client);
                        client.configureBlocking(false);
                        SelectionKey clientKey=client.register(
                            selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        clientKey.attach(buffer);
                    }
                    if (key.isReadable()){
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        System.out.println("Reading.."+key.channel());
                        client.read(output);
                    }
                    if (key.isWritable()){
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        System.out.println("Writing..");
                        client.write(output);
                        output.compact();
                    }
                } catch (IOException ex) {
                    key.cancel(); 
                    try { key.channel().close();} 
                    catch (IOException cex) {}; 
                }
            }
        }
        }
    }
}

- 客户 -

import java.net.*;
import java.nio.*;
import java.io.*;
import java.util.*;


public class EchoClient
{

    public static void main(String [] args)
    {
        byte ch='a';
        try{
        Socket socket = new Socket("localhost",7);
        OutputStream out = socket.getOutputStream();
        InputStream in = socket.getInputStream();
        int closein=5;

        while(true){
            Thread.sleep(1000);
            out.write((byte) ch++);
            System.out.println((char) in.read());
            if (--closein<=0) socket.close();
        }
        }
        catch (InterruptedException ex) {}
        catch (IOException ex) {}
        catch (RuntimeException ex) {}
    }

}
于 2015-01-12T19:23:10.857 回答
0

我刚刚在最近的另一篇文章中评论了大部分代码。至于您自己的代码,响应处理程序不应该循环和休眠,它们应该在超时时执行阻塞读取,或者在超时时循环调用 select()。

于 2012-04-02T10:03:34.027 回答