I am currently involved in doing POC for an RPC layer. I have written the following method to throttle requests on the client side. Is this a good pattern to follow? I did not choose queueing the additional requests into a threadpool because I am interested only in synchronous invocations and I want the caller thread to block until it is woken up for executing the RPC request and also because threadpool seems additional overhead because of creation of additional threads.
I thought I can manage with the threads which are already issuing the requests. This works well, but the CPU usage is a bit unfair to other processes because as soon as a call ends, another call goes out. I load tested it with a huge number of requests and memory and CPU usage are stable. Can I somehow use ArrayBlockingQueue with poll to achieve the same? Is poll() too much of a CPU hog?
Note: I recognise a few concurrency issues with requestEnd method where it might not wake up all registered items correctly and I am thinking of a performant way to maintain atomicity there.
public class RequestQueue {
// TODO The capacity should come from the consumer which in turn comes from
// config
private static final int _OUTBOUND_REQUEST_QUEUE_MAXSIZE = 40000;
private static final int _CURRENT_REQUEST_QUEUE_INCREMENT = 1;
private static final int _CURRENT_REQUEST_POOL_MAXSIZE = 40;
private AtomicInteger currentRequestsCount = new AtomicInteger(0);
private ConcurrentLinkedQueue<RequestWaitItem> outboundRequestQueue = null;
public RequestQueue() {
outboundRequestQueue = new ConcurrentLinkedQueue<RequestWaitItem>();
}
public void registerForFuture(RequestWaitItem waitObject) throws Exception {
if (outboundRequestQueue.size() < _OUTBOUND_REQUEST_QUEUE_MAXSIZE) {
outboundRequestQueue.add(waitObject);
} else {
throw new Exception("Queue is full" + outboundRequestQueue.size());
}
}
public void requestStart() {
currentRequestsCount.addAndGet(_CURRENT_REQUEST_QUEUE_INCREMENT);
}
//Verify correctness
public RequestWaitItem requestEnd() {
int currentRequests = currentRequestsCount.decrementAndGet();
if (this.outboundRequestQueue.size() > 0 && currentRequests < _CURRENT_REQUEST_POOL_MAXSIZE) {
try {
RequestWaitItem waitObject = (RequestWaitItem)this.outboundRequestQueue.remove();
waitObject.setRequestReady(true);
synchronized (waitObject) {
waitObject.notify();
}
return waitObject;
} catch (NoSuchElementException ex) {
//Queue is empty so this is not an exception condition
}
}
return null;
}
public boolean isFull() {
return currentRequestsCount.get() > _CURRENT_REQUEST_POOL_MAXSIZE;
}
}
public class RequestWaitItem {
private boolean requestReady;
private RpcDispatcher dispatcher;
public RequestWaitItem() {
this.requestReady = false;
}
public RequestWaitItem(RpcDispatcher dispatcher) {
this();
this.dispatcher = dispatcher;
}
public boolean isRequestReady() {
return requestReady;
}
public void setRequestReady(boolean requestReady) {
this.requestReady = requestReady;
}
public RpcDispatcher getDispatcher() {
return dispatcher;
}
}
if (requestQueue.isFull()) {
try {
RequestWaitItem waitObject = new RequestWaitItem(dispatcher);
requestQueue.registerForFuture(waitObject);
//Sync
// Config and centralize this timeout
synchronized (waitObject) {
waitObject.wait(_REQUEST_QUEUE_TIMEOUT);
}
if (waitObject.isRequestReady() == false) {
throw new Exception("Request Issuing timedout");
}
requestQueue.requestStart();
try {
return waitObject.getDispatcher().dispatchRpcRequest();
}finally {
requestQueue.requestEnd();
}
} catch (Exception ex) {
// TODO define exception type
throw ex;
}
} else {
requestQueue.requestStart();
try {
return dispatcher.dispatchRpcRequest();
}finally {
requestQueue.requestEnd();
}
}