2

我正在使用 Java 实现服务器/客户端系统。服务器侦听来自客户端的传入连接,在客户端连接后,服务器创建一个新套接字并将其传递给仅用于接收数据的新线程:

while (true){

        clientSocket=serverSocket.accept();
        new ClientReceiver(clientSocket,this.clientsManager).start();
    }

clientReceiver 类如下:

public class ClientReceiver extends Thread {

private Socket clientSocket=null;
private Client client=null;
private ClientsManager clientsManager;

private ClientActionParser clientActionParser=new ClientActionParser();

ClientHandlerState clientHandlerState;

PrintWriter outputStream=null;
BufferedReader inputStream=null;

public ClientReceiver(Socket clientSocket, ClientsManager clientsManager){

    this.clientSocket=clientSocket;
    this.clientsManager=clientsManager;
    this.setClientHandlerState(ClientHandlerState.Connected);
}

public void run(){

    String actionString;
    try{
        //define output and input stream to client
        outputStream =new PrintWriter(clientSocket.getOutputStream(),true);
        inputStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));



        //while client is connected read input to actionString
        while((actionString=inputStream.readLine()) != null){

                AbstractClientAction clientAction= this.clientActionParser.parse(actionString);

                if(this.getClientHandlerState()==ClientHandlerState.Connected){

                    if(clientAction instanceof ClientLoginAction){

                        ClientLoginAction clientLoginAction=(ClientLoginAction) clientAction;
                        if(this.authenticate(clientLoginAction)){


                        }
                        else{
                                throw new AuthenticationException();
                        }

                    }
                    else{

                        throw new AuthenticationException();
                    }
                }

            }
            if(this.getClientHandlerState()==ClientHandlerState.Authorized){

                //receive other client actions: transfer barge ....
            }
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    }
    catch(IOException e){

    }
    catch (AuthenticationException e) {
        // TODO: handle exception
    }

    //clean up the resources
    try{
        outputStream.close();
        inputStream.close();
        clientSocket.close();
    }
    catch(Exception e){

    }
}

private boolean authenticate(ClientLoginAction clientLoginAction){
    //perform authentication. If authentication successful:
    this.client=this.clientsManager.authenticateClient(clientLoginAction.getUsername(), clientLoginAction.getPassword());
    if(this.client==null){
        return false;
    }
    else{
        ClientSender clientSender=new ClientSender(this.outputStream, this.client);
        this.clientsManager.addClientSender(clientSender);
        this.setClientHandlerState(ClientHandlerState.Authorized);
        clientSender.start();
        return true;
    }
}

public ClientHandlerState getClientHandlerState(){

    return this.clientHandlerState;
}

public void setClientHandlerState(ClientHandlerState clientHandlerState){

    this.clientHandlerState=clientHandlerState;
}

在接收者线程中成功验证后,创建一个新线程用于向客户端发送数据,并将套接字的 outputStream 传递给新线程。clientSender 类包含一个队列作为缓冲区,其中包含应发送到客户端的数据。这是类clientSender:

public class ClientSender extends Thread {

private Client client=null;
private final Log logger = LogFactory.getLog(getClass());
PrintWriter outputStream=null;
private Queue<String> clientEventsQueue= new LinkedList<String>();

public ClientSender(PrintWriter outputStream, Client client){
    this.outputStream=outputStream;
    this.client=client;
}

public void run(){

    //System.out.println("ClientSender run method called.");

    while(true){

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


        if(!this.clientEventsQueue.isEmpty()){

            this.outputStream.println(this.clientEventsQueue.remove());
        }
    }
}

public Client getClient(){

    return this.client;
}

public void insertClientEvent(String clientEvent){

    this.clientEventsQueue.add(clientEvent);
}

每当我想向我使用的客户发送一些东西时:

clientSender.insertClientEvent("some text");

问题是如果我删除 Thread.sleep(10) 我将不会在客户端收到任何东西。由于 TCP 套接字被阻塞,我认为这不应该发生。这是正常的还是我做错了什么?

编辑: 发件人线程没有“终止”。当服务器从另一个系统接收到事件时,它应该向所有客户端发送适当的信息。所以我认为最好的方案是在没有数据发送时停止线程并在有数据时启动它。所以我在 clientSender 类中尝试了这个:

public void run(){

    while(true){

        if(this.clientEventsQueue.isEmpty()){
            break;
        }
        else{
            try {
                this.outputStream.println(this.clientEventsQueue.take());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

但现在的问题是何时启动线程?每当我想发送数据时,我都尝试启动它,但正如我所料,它不能正常工作,只发送第一个包:

clientSender.insertClientEvent(clientEvent.getEventString());
clientSender.start();

EDIT2 我想出了这个主意。它非常简单,我认为它消耗的 CPU 时间要少得多。

while(true){

        while(this.clientEventsQueue.isEmpty()){

            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        try {
            this.outputStream.println(this.clientEventsQueue.take());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

尽管我测试了它,但它工作得很好。你怎么看待这件事?

4

1 回答 1

4

我看到您正在使用 aLinkedList作为由多个线程访问的队列,并且您在ClientSender.
此实现不是线程安全的,可能会导致问题,例如clientEvents发布在其上的内容对ClientSender线程不可见、CPU 被浪费等。

您可以改用 aBlockingQueue并调用take()它来阻止队列,直到发布事件。

我也看到你是靠sleep(400)等待通讯的。这也会引起问题。使用套接字资源的线程可以在完成后关闭它,而不是这样。

编辑:
有许多技术可以处理终止线程。我认为“毒丸”在这种情况下会很好用。基本上你这样做:

String stuff = queue.take();
if (stuff == null) break;

null在您想终止队列时发布(不必为空,可以是任何东西,例如"terminate"等。

EDIT2:
您的终止方式不起作用,因为它会在任何人在其上发布事件之前立即终止。从理论上讲,您可能会产生然后立即一遍又一遍地杀死线程等。解决此问题的最简单方法是使用特殊消息(又名“毒丸”)作为终止条件。

至于只有在有事件时才有线程,那时我建议使用线程池。您可以将发送的事件封装到 aRunnable中,并将套接字保存在 Map 中。然而,这实现起来相当复杂,并且需要对多线程有很好的理解才能使其正确。多线程很困难,如果做错了会引起严重的头痛。Tbh 我不建议在不学习更多关于多线程编程的情况下尝试这样做。

EDIT3: @user2355734:许多人都像您一样按间隔轮询队列,但不鼓励这样做。take() 方法实际上会“休眠”,并且只有在队列中有东西时才会唤醒,因此理论上通过移除“休眠”循环,您应该会获得更低的 CPU 使用率和更短的延迟。一般来说,您应该尽量避免在多线程代码中完全使用“睡眠”。您很少真正需要它,而且它通常是代码损坏/次优的标志。至于测试,虽然它们很有用,但很难甚至几乎不可能通过测试来保证多线程代码的正确性。您的代码可能在您的测试中运行良好,但在生产、高负载、不同环境等情况下会失败。因此它'

于 2013-06-12T19:58:07.117 回答