我正在开发一个基于 REST 服务的项目,其中我有两个组件,如下所述-
URL's
客户端,它将为服务组件提供必要的服务- 然后服务(REST 服务)组件将使用这些
URL's
从数据库中获取数据。
通常 URL 看起来像这样 -
http://host.qa.ebay.com:8080/deservice/DEService/get/USERID=9012/PROFILE.ACCOUNT,PROFILE.ADVERTISING,PROFILE.DEMOGRAPHIC,PROFILE.FINANCIAL
上面 URL 的含义是-为了USERID- 9012
给我这些列的数据库中的数据-
[PROFILE.ACCOUNT, PROFILE.ADVERTISING, PROFILE.DEMOGRAPHIC, PROFILE.FINANCIAL]
目前我正在客户端组件方面进行基准测试。我发现下面的方法正在使用time(95 Percentile)
一堆~15ms
。
下面的方法将接受两个参数 -
List<DEKey> keys- sample data in keys will have USERID=9012
List<String> reqAttrNames- sample data for reqAttrNames will be-
[PROFILE.ACCOUNT, PROFILE.ADVERTISING, PROFILE.DEMOGRAPHIC, PROFILE.FINANCIAL]
下面是代码-
public DEResponse getDEAttributes(List<DEKey> keys, List<String> reqAttrNames) {
DEResponse response = null;
try {
String url = buildGetUrl(keys,reqAttrNames);
if(url!=null){
List<CallableTask<DEResponse>> tasks = new ArrayList<CallableTask<DEResponse>>();
CallableTask<DEResponse> task = new DEResponseTask(url);
tasks.add(task);
// STEP 2: Execute worker threads for all the generated urls
List<LoggingFuture<DEResponse>> futures = null;
try {
long waitTimeout = getWaitTimeout(keys);
futures = executor.executeAll(tasks, null, waitTimeout, TimeUnit.MILLISECONDS);
// STEP 3: Consolidate results of the executed worker threads
if(futures!=null && futures.size()>0){
LoggingFuture<DEResponse> future = futures.get(0);
response = future.get();
}
} catch (InterruptedException e1) {
logger.log(LogLevel.ERROR,"Transport:getDEAttributes Request timed-out :",e1);
}
}else{
//
}
} catch(Throwable th) {
}
return response;
}
上面的方法会把对象还给我DEResponse
。
下面是DEResponseTask class
public class DEResponseTask extends BaseNamedTask implements CallableTask<DEResponse> {
private final ObjectMapper m_mapper = new ObjectMapper();
@Override
public DEResponse call() throws Exception {
URL url = null;
DEResponse DEResponse = null;
try {
if(buildUrl!=null){
url = new URL(buildUrl);
DEResponse = m_mapper.readValue(url, DEResponse.class);
}else{
logger.log(LogLevel.ERROR, "DEResponseTask:call is null ");
}
} catch (MalformedURLException e) {
}catch (Throwable th) {
}finally{
}
return DEResponse;
}
}
这个多线程代码的编写方式有什么问题吗?如果是,我怎样才能提高效率?
executeAll
在我的公司中为方法签名,executor
他们有自己的执行程序,它将实现 Sun Executor 类-
/**
* Executes the given tasks, returning a list of futures holding their
* status and results when all complete or the timeout expires, whichever
* happens first. <tt>Future.isDone()</tt> is <tt>true</tt> for each
* element of the returned list. Upon return, tasks that have not completed
* are cancelled. Note that a <i>completed</i> task could have terminated
* either normally or by throwing an exception. The results of this method
* are undefined if the given collection is modified while this operation is
* in progress. This is entirely analogous to
* <tt>ExecutorService.invokeAll()</tt> except for a couple of important
* differences. First, it cancels but does not <b>interrupt</b> any
* unfinished tasks, unlike <tt>ExecutorService.invokeAll()</tt> which
* cancels and interrupts unfinished tasks. This results in a better
* adherence to the specified timeout value, as interrupting threads may
* have unexpected delays depending on the nature of the tasks. Also, all
* eBay-specific features apply when the tasks are submitted with this
* method.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return a list of futures representing the tasks, in the same sequential
* order as produced by the iterator for the given task list. If the
* operation did not time out, each task will have completed. If it did
* time out, some of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in which case
* unfinished tasks are cancelled
*/
public <V> List<LoggingFuture<V>> executeAll(Collection<? extends CallableTask<V>> tasks,
Options options,
long timeout, TimeUnit unit)
throws InterruptedException {
return executeAll(tasks, options, timeout, unit, false);
}
更新:-
一旦我通过增加线程来增加执行基准测试的程序的负载,这个组件就需要时间20
newFixedThreadPool(20)
但我相信如果我使用这个组件可以正常工作 -
newSingleThreadExecutor
我能想到的唯一原因是,可能在上面的代码中,有一个阻塞调用,所以这就是线程被阻塞的原因,这就是为什么它需要时间?
更新:-
所以这行应该这样写?-
if(futures!=null && futures.size()>0){
LoggingFuture<DEResponse> future = futures.get(0);
//response = future.get();//replace this with below code-
while(!future.isDone()) {
Thread.sleep(500);
}
response = future.get();
}