0

I am currently involved in doing POC for an RPC layer. I have written the following method to throttle requests on the client side. Is this a good pattern to follow? I did not choose queueing the additional requests into a threadpool because I am interested only in synchronous invocations and I want the caller thread to block until it is woken up for executing the RPC request and also because threadpool seems additional overhead because of creation of additional threads.

I thought I can manage with the threads which are already issuing the requests. This works well, but the CPU usage is a bit unfair to other processes because as soon as a call ends, another call goes out. I load tested it with a huge number of requests and memory and CPU usage are stable. Can I somehow use ArrayBlockingQueue with poll to achieve the same? Is poll() too much of a CPU hog?

Note: I recognise a few concurrency issues with requestEnd method where it might not wake up all registered items correctly and I am thinking of a performant way to maintain atomicity there.

 public class RequestQueue {    
    // TODO The capacity should come from the consumer which in turn comes from
    // config
    private static final int _OUTBOUND_REQUEST_QUEUE_MAXSIZE = 40000;
    private static final int _CURRENT_REQUEST_QUEUE_INCREMENT = 1;
    private static final int _CURRENT_REQUEST_POOL_MAXSIZE = 40;
    private AtomicInteger currentRequestsCount = new AtomicInteger(0);
    private ConcurrentLinkedQueue<RequestWaitItem> outboundRequestQueue = null;

    public RequestQueue() {
        outboundRequestQueue = new ConcurrentLinkedQueue<RequestWaitItem>();
    }



    public void registerForFuture(RequestWaitItem waitObject) throws Exception {
        if (outboundRequestQueue.size() < _OUTBOUND_REQUEST_QUEUE_MAXSIZE) {
            outboundRequestQueue.add(waitObject);
        } else {
            throw new Exception("Queue is full" + outboundRequestQueue.size());
        }
    }

    public void requestStart() {
        currentRequestsCount.addAndGet(_CURRENT_REQUEST_QUEUE_INCREMENT);
    }

    //Verify correctness
    public RequestWaitItem requestEnd() {
        int currentRequests = currentRequestsCount.decrementAndGet();
        if (this.outboundRequestQueue.size() > 0 && currentRequests < _CURRENT_REQUEST_POOL_MAXSIZE) {
            try {
                RequestWaitItem waitObject = (RequestWaitItem)this.outboundRequestQueue.remove();               
                waitObject.setRequestReady(true);
                synchronized (waitObject) {
                    waitObject.notify();
                }   
                return waitObject;
            } catch (NoSuchElementException ex) {
                //Queue is empty so this is not an exception condition
            }
        }
        return null;
    }

    public boolean isFull() {
        return currentRequestsCount.get() > _CURRENT_REQUEST_POOL_MAXSIZE;
    }

}




public class RequestWaitItem {
    private boolean requestReady;
    private RpcDispatcher dispatcher;

    public RequestWaitItem() {
        this.requestReady = false;
    }

    public RequestWaitItem(RpcDispatcher dispatcher) {
        this();
        this.dispatcher = dispatcher;
    }

    public boolean isRequestReady() {
        return requestReady;
    }

    public void setRequestReady(boolean requestReady) {
        this.requestReady = requestReady;
    }

    public RpcDispatcher getDispatcher() {
        return dispatcher;
    }
}




if (requestQueue.isFull()) {
        try {               
                RequestWaitItem waitObject = new RequestWaitItem(dispatcher);               
                requestQueue.registerForFuture(waitObject);
                //Sync              
                // Config and centralize this timeout
                synchronized (waitObject) {
                    waitObject.wait(_REQUEST_QUEUE_TIMEOUT);
                }

                if (waitObject.isRequestReady() == false) {
                    throw new Exception("Request Issuing timedout");
                }
                requestQueue.requestStart();
                try {
                    return waitObject.getDispatcher().dispatchRpcRequest();
                }finally {
                    requestQueue.requestEnd();
                }               
        } catch (Exception ex) {
            // TODO define exception type
            throw ex;
        }
    } else {
        requestQueue.requestStart();
        try {
            return dispatcher.dispatchRpcRequest();
        }finally {              
            requestQueue.requestEnd();
        }
    }
4

2 回答 2

1

If I understood correctly, you want to throttle requests to remote service, by having at most 40 (say) concurrent requests. You can do this easily, without extra threads or services, with a semaphore.

Semaphore s = new Semaphore(40);
...
s.acquire();
try {
    dispatcher.dispatchRpcRequest(); // Or whatever your remote call looks like
} finally {
    s.release();
}
于 2012-12-04T14:20:58.810 回答
0

Use ExecutorService service = Executors.newFixedThreadPool(10); for this.

This will create at the max 10 threads and further requests will wait in the queue. I guess this should help.

Fixed Thread Pool

于 2012-12-04T14:11:08.167 回答