您可以使用 java util concurrent 的SynchronousQueue来实现所需的结果。创建固定数量的工人。使用take调用启动对 SynchronousQueue 的块读取。因此,如果所有工作人员都已完成一项工作并忙于处理它们(与套接字通信),则不会从 SynchronousQueue 读取任何内容,因此对同步队列的提议将失败。检查此故障(这意味着所有固定数量的工作人员都忙,现在没有人锁定到队列中),拒绝下一个请求。
以下行中的示例代码 [未经测试 - 为简洁起见避免异常,请根据您的需要进行修改]。
public class BoundedServer
{
public static void main(String[] args)
{
/**
* Port to serve
*/
final int port = 2013;
/**
* Max Workers
*/
final int maxworkers = 10;
/**
* The server socket.
*/
ServerSocket mServerSocket = null;
/**
* Queue of work units to process if there is a worker available.
*/
final SynchronousQueue<WorkUnit> mQueueToProcess = new SynchronousQueue<WorkUnit>();
/**
* Queue of work units to reject if there is no current worker available.
*/
final LinkedBlockingQueue<WorkUnit> mQueueToReject = new LinkedBlockingQueue<WorkUnit>();
/**
* A thread pool to handle the work.
*/
final ExecutorService communicationservice = Executors.newFixedThreadPool(maxworkers);
/**
* Let a single thread take care of rejecting the requests when needed to do so.
*/
final ExecutorService rejectionservice = Executors.newSingleThreadExecutor();
try
{
Runnable communicationlauncher = new Runnable()
{
public void run()
{
try
{
/**
* Set of workers to handle the work.
*/
final CommunicationWorker[] workers = new CommunicationWorker[maxworkers];
communicationservice.invokeAll(Arrays.asList(workers));
}
finally
{
communicationservice.shutdown();
}
}
};
new Thread(communicationlauncher).start();
Runnable rejectionlauncher = new Runnable()
{
public void run()
{
try
{
RejectionWorker rejectionworker = new RejectionWorker(mQueueToReject);
rejectionservice.submit(rejectionworker);
}
finally
{
rejectionservice.shutdown();
}
}
};
new Thread(rejectionlauncher).start();
mServerSocket = new ServerSocket(port);
while(true)
{
WorkUnit work = new WorkUnit(mServerSocket.accept());
if(!mQueueToProcess.offer(work))
{
mQueueToReject.add(work);
}
}
}
finally
{
try
{
mServerSocket.close();
}
}
}
}
public class WorkUnit
{
private Socket mSocket = null;
public WorkUnit(Socket socket)
{
super();
this.setSocket(socket);
}
public Socket getSocket() {
return mSocket;
}
public void setSocket(Socket mSocket) {
this.mSocket = mSocket;
}
}
public class CommunicationWorker
implements Callable<Boolean>
{
private SynchronousQueue<WorkUnit> mQueueToProcess;
public CommunicationWorker(SynchronousQueue<WorkUnit> queueToProcess)
{
super();
this.mQueueToProcess = queueToProcess;
}
@Override
public Boolean call() throws Exception
{
while(true)
{
WorkUnit work = mQueueToProcess.take();
Socket socket = work.getSocket();
// Code to handle socket communication and closure.
// Once the communication is finished, this thread will get blocked to mQueueToProcess.
}
}
}
public class RejectionWorker
implements Callable<Boolean>
{
private LinkedBlockingQueue<WorkUnit> mQueueToReject;
public RejectionWorker(LinkedBlockingQueue<WorkUnit> queueToReject)
{
super();
this.mQueueToReject = queueToReject;
}
@Override
public Boolean call() throws Exception
{
while(true)
{
WorkUnit work = mQueueToReject.take();
Socket socket = work.getSocket();
// Code to reject the request.
}
}
}