4

我不是一个非常有经验的 Java 程序员,所以如果这是一个新手问题,请原谅我。

我正在设计一个由 3 个模块组成的系统。客户端、服务器和应用程序。这个想法是客户端向服务器发送消息。服务器触发应用程序中的用例。用例的结果返回给服务器,服务器将结果发送给客户端。我选择了这种架构,因为我希望需要同时支持多个客户端,我希望能够在其他应用程序中重用服务器模块,我想让负责管理客户端连接的代码与尽可能地实现了领域逻辑,并且因为有机会学习了一些更高级的java。

我打算将各种模块与队列联系在一起。客户端很简单。发出一条消息并阻塞,直到响应到达(它可能过于简单,但目前它是一个合理的模型)。该应用程序同样没有问题。它阻塞其输入队​​列,在收到有效消息时执行用例并将结果推送到输出队列。拥有多个客户会使事情变得更加棘手,但以我的经验水平仍然在我的掌握之中。服务器为每个打开的连接维护线程,并且服务器出站/应用程序入站队列是同步的,因此如果 2 条消息同时到达,则第二个线程只需等待第一个线程将其有效负载传递到队列中。

问题是中间的部分,服务器,它必须阻塞两个独立的东西。服务器同时监视客户端和应用程序的输出队列(用作服务器的输入队列)。服务器需要阻塞,直到有消息从客户端进来(然后转发给应用程序),或者直到应用程序完成任务并将结果推送到应用程序出站/服务器入站队列中。

据我所知,Java 只能阻止一件事。

在客户端发送消息或服务器入站队列不再为空之前,是否可以让服务器阻塞?

更新:

我有一些时间来解决这个问题,并设法将问题减少到说明问题的最低限度。即使进行了修整,也会出现一些庞大的代码转储,因此对此表示歉意。我会尽量打破它。

这是服务器的代码:

public class Server implements Runnable {

    private int                     listenPort          = 0;
    private ServerSocket            serverSocket        = null;
    private BlockingQueue<Message>  upstreamMessaes     = null;
    private BlockingQueue<Message>  downstreamMessages  = null;
    private Map<Integer, Session>   sessions            = new ConcurrentHashMap ();
    private AtomicInteger           lastId              = new AtomicInteger ();

    /**
     * Start listening for clients to process
     * 
     * @throws IOException 
     */
    @Override
    public void run () {
        int     newSessionId;
        Session newSession;
        Thread  newThread;

        System.out.println (this.getClass () + " running");

        // Client listen loop
        while (true) {
            newSessionId    = this.lastId.incrementAndGet ();
            try {
                newSession  = new Session (this, newSessionId);
                newThread   = new Thread (newSession);
                newThread.setName ("ServerSession_" + newSessionId);
                this.sessions.put (newSessionId, newSession);
                newThread.start ();
            } catch (IOException ex) {
                Logger.getLogger (Server.class.getName ()).log (Level.SEVERE, null, ex);
            }
        }
    }

    /**
     * Accept a connection from a new client
     * 
     * @return The accepted Socket
     * @throws IOException 
     */
    public Socket accept () throws IOException {
        return this.getSocket().accept ();
    }

    /**
     * Delete the specified Session
     * 
     * @param sessionId ID of the Session to remove
     */
    public void deleteSession (int sessionId) {
        this.sessions.remove (sessionId);
    }

    /**
     * Forward an incoming message from the Client to the application
     * 
     * @param msg
     * @return
     * @throws InterruptedException 
     */
    public Server messageFromClient (Message msg) throws InterruptedException {
        this.upstreamMessaes.put (msg);
        return this;
    }

    /**
     * Set the port to listen to
     * 
     * We can only use ports in the range 1024-65535 (ports below 1024 are 
     * reserved for common protocols such as HTTP and ports above 65535 don't
     * exist)
     * 
     * @param listenPort
     * @return Returns itself so further methods can be called
     * @throws IllegalArgumentException 
     */
    public final Server setListenPort (int listenPort) throws IllegalArgumentException {
        if ((listenPort > 1023) && (listenPort <= 65535)) {
            this.listenPort = listenPort;
        } else {
            throw new IllegalArgumentException ("Port number " + listenPort + " not valid");
        }

        return this;
    }

    /**
     * Get the server socket, initialize it if it isn't already started.
     * 
     * @return The object's ServerSocket
     * @throws IOException 
     */
    private ServerSocket getSocket () throws IOException {
        if (null == this.serverSocket) {
            this.serverSocket = new ServerSocket (this.listenPort);
        }

        return this.serverSocket;
    }

    /**
     * Instantiate the server
     * 
     * @param listenPort
     * @throws IllegalArgumentException 
     */
    public Server ( int listenPort, 
                    BlockingQueue<Message> incomingMessages, 
                    BlockingQueue<Message> outgoingMessages) throws IllegalArgumentException {
        this.setListenPort (listenPort);
        this.upstreamMessaes    = incomingMessages;
        this.downstreamMessages = outgoingMessages;
        System.out.println (this.getClass () + " created");
        System.out.println ("Listening on port " + listenPort);
    }
}

我相信以下方法属于服务器,但目前已被注释掉。

    /**
     * Notify a Session of a message for it
     * 
     * @param sessionMessage 
     */
    public void notifySession () throws InterruptedException, IOException {
        Message sessionMessage  = this.downstreamMessages.take ();
        Session targetSession   = this.sessions.get (sessionMessage.getClientID ());
        targetSession.waitForServer (sessionMessage);
    }

这是我的会话课

public class Session implements Runnable {
    private Socket              clientSocket    = null;
    private OutputStreamWriter  streamWriter    = null;
    private StringBuffer        outputBuffer    = null;
    private Server              server          = null;
    private int                 sessionId       = 0;

    /**
     * Session main loop
     */
    @Override
    public void run () {

        StringBuffer    inputBuffer = new StringBuffer ();
        BufferedReader  inReader;

        try {
            // Connect message
            this.sendMessageToClient ("Hello, you are client " + this.getId ());
            inReader    = new BufferedReader (new InputStreamReader (this.clientSocket.getInputStream (), "UTF8"));

            do {
                // Parse whatever was in the input buffer
                inputBuffer.delete (0, inputBuffer.length ());
                inputBuffer.append (inReader.readLine ());
                System.out.println ("Input message was: " + inputBuffer);
                this.server.messageFromClient (new Message (this.sessionId, inputBuffer.toString ()));
            } while (!"QUIT".equals (inputBuffer.toString ()));

            // Disconnect message
            this.sendMessageToClient ("Goodbye, client " + this.getId ());
        } catch (IOException | InterruptedException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
        } finally {
            this.terminate ();
            this.server.deleteSession (this.getId ());
        }
    }

    /**
     * 
     * @param msg
     * @return
     * @throws IOException 
     */
    public Session waitForServer (Message msg) throws IOException {
        // Generate a response for the input
        String output   = this.buildResponse (msg.getPayload ()).toString ();
        System.out.println ("Output message will be: " + output);

        // Output to client
        this.sendMessageToClient (output);

        return this;
    }

    /**
     * 
     * @param request
     * @return 
     */
    private StringBuffer buildResponse (CharSequence request) {
        StringBuffer ob = this.outputBuffer;
        ob.delete (0, this.outputBuffer.length ());

        ob.append ("Server repsonded at ")
          .append (new java.util.Date ().toString () )
          .append (" (You said '" )
          .append (request)
          .append ("')");

        return this.outputBuffer;
    }

    /**
     * Send the given message to the client
     * 
     * @param message
     * @throws IOException 
     */
    private void sendMessageToClient (CharSequence message) throws IOException {
        // Output to client
        OutputStreamWriter osw  = this.getStreamWriter ();
        osw.write ((String) message);
        osw.write ("\r\n");
        osw.flush ();
    }

    /**
     * Get an output stream writer, initialize it if it's not active
     * 
     * @return A configured OutputStreamWriter object
     * @throws IOException 
     */
    private OutputStreamWriter getStreamWriter () throws IOException {
        if (null == this.streamWriter) {
            BufferedOutputStream os = new BufferedOutputStream (this.clientSocket.getOutputStream ());
            this.streamWriter       = new OutputStreamWriter (os, "UTF8");
        }

        return this.streamWriter;
    }

    /**
     * Terminate the client connection
     */
    private void terminate () {
        try {
            this.streamWriter   = null;
            this.clientSocket.close ();
        } catch (IOException e) {
            Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
        }
    }

    /**
     * Get this Session's ID
     * 
     * @return The ID of this session
     */
    public int getId () {
        return this.sessionId;
    }

    /**
     * Session constructor
     * 
     * @param owner The Server object that owns this session
     * @param sessionId The unique ID this session will be given
     * @throws IOException 
     */
    public Session (Server owner, int sessionId) throws IOException {
        System.out.println ("Class " + this.getClass () + " created");

        this.server         = owner;
        this.sessionId      = sessionId;
        this.clientSocket   = this.server.accept ();

        System.out.println ("Session ID is " + this.sessionId);
    }
}

测试应用程序相当基本,它只是回显原始请求消息的修改版本。真正的应用程序将在收到消息并向服务器返回有意义的响应时工作。

public class TestApp implements Runnable {
    private BlockingQueue <Message> inputMessages, outputMessages;

    @Override
    public void run () {
        Message         lastMessage;
        StringBuilder   returnMessage   = new StringBuilder ();

        while (true) {
            try {
                lastMessage = this.inputMessages.take ();

                // Construct a response
                returnMessage.delete (0, returnMessage.length ());
                returnMessage.append ("Server repsonded at ")
                             .append (new java.util.Date ().toString () )
                             .append (" (You said '" )
                             .append (lastMessage.getPayload ())
                             .append ("')");

                // Pretend we're doing some work that takes a while
                Thread.sleep (1000);

                this.outputMessages.put (new Message (lastMessage.getClientID (), lastMessage.toString ()));
            } catch (InterruptedException ex) {
                Logger.getLogger (TestApp.class.getName ()).log (Level.SEVERE, null, ex);
            }
        }
    }

    /**
     * Initialize the application
     * 
     * @param inputMessages Where input messages come from
     * @param outputMessages Where output messages go to
     */
    public TestApp (BlockingQueue<Message> inputMessages, BlockingQueue<Message> outputMessages) {
        this.inputMessages = inputMessages;
        this.outputMessages = outputMessages;
        System.out.println (this.getClass () + " created");
    }
}

Message 类非常简单,仅由原始客户端 ID 和有效负载字符串组成,因此我将其省略了。

最后主类看起来像这样。

public class Runner {
    /**
     * 
     * @param args The first argument is the port to listen on.
     * @throws Exception  
     */
    public static void main (String[] args) throws Exception {
        BlockingQueue<Message>  clientBuffer    = new LinkedBlockingQueue (); 
        BlockingQueue<Message>  appBuffer       = new LinkedBlockingQueue ();
        TestApp appInstance                     = new TestApp (clientBuffer, appBuffer);
        Server serverInstance                   = new Server (Integer.parseInt (args [0]), clientBuffer, appBuffer);
        Thread appThread                        = new Thread (appInstance);
        Thread serverThread                     = new Thread (serverInstance);

        appThread.setName("Application");
        serverThread.setName ("Server");

        appThread.start ();
        serverThread.start ();

        appThread.join ();
        serverThread.join ();

        System.exit (0);
    }
}

虽然实际应用程序会更复杂,但 TestApp 说明了基本的使用模式。它阻塞在它的输入队列上,直到那里有东西,处理它,然后将结果推送到它的输出队列中。

会话类管理特定客户端和服务器之间的实时连接。它从客户端获取输入并将其转换为 Message 对象,并从服务器获取 Message 对象并将它们转换为输出以发送给客户端。

服务器侦听新的传入连接并为其拥有的每个传入连接设置一个 Session 对象。当 Session 向它传递 Message 时,它​​会将其放入上游队列以供应用程序处理。

我遇到的困难是让返回消息从 TestApp 传回各个客户端。当来自客户端的消息进来时,Session 生成一个 Message 并将其发送到 Server,然后将其放入其上游队列,该上游队列也是 TestApp 的输入队列。作为响应,TestApp 生成响应 Message 并将其放入输出队列,该队列也是 Server 的下游队列。

这意味着 Sessions 需要等待两个不相关的事件。他们应该阻止直到

  • 输入来自客户端(客户端套接字上的 BufferedReader 有输入要处理),
  • 或者服务器向它发送一条消息(服务器在会话上调用 WaitForServer() 方法)

至于服务器本身,它也必须等待两个不相关的事件。

  • 会话调用 messageFromClient() 并传递一条消息给 TestApp,
  • 或者 TestApp 将消息推送到输出/下游队列以传递到会话。

表面上看起来是一项简单的任务,但事实证明比我最初想象的要困难得多。我希望我忽略了一些明显的东西,因为我对并发编程还是很陌生,但是如果你能帮助指出我哪里出错了,我会很感激指导。

4

2 回答 2

1

因为您的实现是使用方法在客户端-会话-服务器之间传递数据,所以您实际上已经解决了眼前的问题。但是,这可能不是您的本意。这是正在发生的事情:

Session 的run方法在自己的线程中运行,阻塞在套接字上。当服务器调用waitForServer时,该方法立即在服务器的线程中执行——在 Java 中,如果一个线程调用一个方法,那么该方法在该线程中执行,因此在这种情况下 Session 不需要解除阻塞。为了创建您要解决的问题,您需要删除该waitForServer方法并将其替换为BlockingQueue messagesFromServer队列 - 然后服务器会将消息放入此队列中,而 Session 将需要阻止它,导致 Session 需要阻塞两个不同的对象(套接字和队列)。

假设您切换到 Session 需要阻塞两个对象的实现,我认为您可以使用我在评论中描述的两种方法的混合来解决这个问题:

每个 Session 的套接字都需要一个线程来阻塞它——我看不出有什么办法,除非你愿意用一个固定的线程池(比如 4 个线程)来替换它,它会轮询套接字并休眠几个十几毫秒,如果没有什么可以从它们读取。

您可以使用单个队列和阻塞它的单个线程来管理所有服务器 -> 会话流量 - 服务器在其有效负载中包含会话“地址”,以便阻塞它的线程知道如何处理消息。如果您发现当您有很多会话时这无法扩展,那么您总是可以增加线程/队列计数,例如,对于 32 个会话,您可以有 4 个线程/队列,每个线程/队列有 8 个会话。

于 2013-04-27T14:38:45.747 回答
0

我可能误解了,但似乎你有代码“监听”消息的地方,你应该能够使用简单的 OR 语句来解决这个问题。

另一件可能有用的事情是为每个客户端添加一个唯一的 id,这样您就可以知道该消息是针对哪个客户端的。

于 2013-04-26T17:28:17.820 回答