2

我写了以下两行

ServerSocket mobCom = new ServerSocket(9846);
Socket server = mobCom.accept();

我希望创建一个新的 TCP 连接并由一个新线程处理该连接。例如上面的代码创建了一个服务器套接字。并且有多个客户。每当客户端连接到服务器时,可能会创建一个新线程来满足来自该特定客户端的请求。我如何实现相同的。

编辑

我还想将线程池限制为 10 个用户。如果出现更多用户,我想在不处理来自他们的进一步请求的情况下向他们发送错误消息。

4

2 回答 2

0

您可以使用 java util concurrent 的SynchronousQueue来实现所需的结果。创建固定数量的工人。使用take调用启动对 SynchronousQueue 的块读取。因此,如果所有工作人员都已完成一项工作并忙于处理它们(与套接字通信),则不会从 SynchronousQueue 读取任何内容,因此对同步队列的提议将失败。检查此故障(这意味着所有固定数量的工作人员都忙,现在没有人锁定到队列中),拒绝下一个请求。

以下行中的示例代码 [未经测试 - 为简洁起见避免异常,请根据您的需要进行修改]。

public class BoundedServer 
{
    public static void main(String[] args) 
    {
        /**
         * Port to serve
         */
        final int port = 2013;

        /**
         * Max Workers
         */
        final int maxworkers = 10; 

        /**
         * The server socket.
         */
        ServerSocket mServerSocket = null;

        /**
         * Queue of work units to process if there is a worker available.
         */
        final SynchronousQueue<WorkUnit> mQueueToProcess = new SynchronousQueue<WorkUnit>();

        /**
         * Queue of work units to reject if there is no current worker available.
         */
        final LinkedBlockingQueue<WorkUnit> mQueueToReject = new LinkedBlockingQueue<WorkUnit>(); 

        /**
         * A thread pool to handle the work.
         */
        final ExecutorService communicationservice = Executors.newFixedThreadPool(maxworkers);

        /**
         * Let a single thread take care of rejecting the requests when needed to do so.
         */
        final ExecutorService rejectionservice = Executors.newSingleThreadExecutor();

        try 
        {
            Runnable communicationlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        /**
                         * Set of workers to handle the work.
                         */
                        final CommunicationWorker[] workers = new CommunicationWorker[maxworkers];

                        communicationservice.invokeAll(Arrays.asList(workers));
                    }
                    finally
                    {
                        communicationservice.shutdown();
                    }
                }
            };

            new Thread(communicationlauncher).start();

            Runnable rejectionlauncher = new Runnable() 
            {
                public void run() 
                {
                    try
                    {
                        RejectionWorker rejectionworker = new RejectionWorker(mQueueToReject);

                        rejectionservice.submit(rejectionworker);
                    }
                    finally
                    {
                        rejectionservice.shutdown();
                    }
                }
            };
            new Thread(rejectionlauncher).start();

            mServerSocket = new ServerSocket(port);

            while(true)
            {
                WorkUnit work = new WorkUnit(mServerSocket.accept());

                if(!mQueueToProcess.offer(work))
                {
                    mQueueToReject.add(work);
                }
            }
        } 
        finally
        {
            try
            {
                mServerSocket.close();
            }
        }
    }
}


public class WorkUnit 
{
    private Socket mSocket = null;

    public WorkUnit(Socket socket) 
    {
        super();
        this.setSocket(socket);
    }

    public Socket getSocket() {
        return mSocket;
    }

    public void setSocket(Socket mSocket) {
        this.mSocket = mSocket;
    }
}

public class CommunicationWorker 
implements Callable<Boolean> 
{
    private SynchronousQueue<WorkUnit> mQueueToProcess;

    public CommunicationWorker(SynchronousQueue<WorkUnit> queueToProcess) 
    {
        super();
        this.mQueueToProcess = queueToProcess;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToProcess.take();

            Socket socket = work.getSocket();

            // Code to handle socket communication and closure.
            // Once the communication is finished, this thread will get blocked to mQueueToProcess.
        }
    }
}


public class RejectionWorker 
implements Callable<Boolean> 
{
    private LinkedBlockingQueue<WorkUnit> mQueueToReject;

    public RejectionWorker(LinkedBlockingQueue<WorkUnit> queueToReject) 
    {
        super();
        this.mQueueToReject = queueToReject;
    }

    @Override
    public Boolean call() throws Exception 
    {
        while(true)
        {
            WorkUnit work = mQueueToReject.take();

            Socket socket = work.getSocket();

            // Code to reject the request.
        }
    }
}
于 2013-03-27T20:09:57.920 回答
-1

您将不得不执行以下操作。ServiceThread 是线程将服务请求。

 while (true) {
              try {
                  Socket clientSocket = null;
                  if (null != serverSocket) {
                    clientSocket = serverSocket.accept();
                    ServiceThread serverThread = new ServiceThread(clientSocket); // Create a new thread for each client
                    serverThread.start();
                  }
              }  catch( Exception ex ) {
                  System.out.println("Exception while accepting connection " + ex.getMessage());
                  ex.printStackTrace();
              }
于 2013-03-27T18:13:53.060 回答