0

我正在开发一个基于 REST 服务的项目,其中我有两个组件,如下所述-

  1. URL's客户端,它将为服务组件提供必要的服务
  2. 然后服务(REST 服务)组件将使用这些URL's从数据库中获取数据。

通常 URL 看起来像这样 -

http://host.qa.ebay.com:8080/deservice/DEService/get/USERID=9012/PROFILE.ACCOUNT,PROFILE.ADVERTISING,PROFILE.DEMOGRAPHIC,PROFILE.FINANCIAL

上面 URL 的含义是-为了USERID- 9012给我这些列的数据库中的数据-

[PROFILE.ACCOUNT, PROFILE.ADVERTISING, PROFILE.DEMOGRAPHIC, PROFILE.FINANCIAL]

目前我正在客户端组件方面进行基准测试。我发现下面的方法正在使用time(95 Percentile)一堆~15ms

下面的方法将接受两个参数 -

List<DEKey> keys- sample data in keys will have USERID=9012

List<String> reqAttrNames- sample data for reqAttrNames will be-

[PROFILE.ACCOUNT, PROFILE.ADVERTISING, PROFILE.DEMOGRAPHIC, PROFILE.FINANCIAL]

下面是代码-

public DEResponse getDEAttributes(List<DEKey> keys, List<String> reqAttrNames) {

    DEResponse response = null;
    try {
        String url = buildGetUrl(keys,reqAttrNames);

        if(url!=null){
            List<CallableTask<DEResponse>> tasks = new ArrayList<CallableTask<DEResponse>>();
            CallableTask<DEResponse> task = new DEResponseTask(url); 
            tasks.add(task);

            // STEP 2: Execute worker threads for all the generated urls
            List<LoggingFuture<DEResponse>> futures = null;
            try {
                long waitTimeout = getWaitTimeout(keys);
                futures = executor.executeAll(tasks, null, waitTimeout, TimeUnit.MILLISECONDS);

                // STEP 3: Consolidate results of the executed worker threads
                if(futures!=null && futures.size()>0){
                    LoggingFuture<DEResponse> future = futures.get(0);
                    response = future.get();
                }
            } catch (InterruptedException e1) {
                logger.log(LogLevel.ERROR,"Transport:getDEAttributes Request timed-out :",e1);
            }
        }else{
            //
        }
    }  catch(Throwable th) {

    }

    return response;
}

上面的方法会把对象还给我DEResponse

下面是DEResponseTask class

public class DEResponseTask  extends BaseNamedTask implements CallableTask<DEResponse> {

        private final ObjectMapper m_mapper = new ObjectMapper();

        @Override
        public DEResponse call() throws Exception {
            URL url = null;
            DEResponse DEResponse = null;
            try {
                if(buildUrl!=null){
                    url = new URL(buildUrl);

                    DEResponse = m_mapper.readValue(url, DEResponse.class);

                }else{
                    logger.log(LogLevel.ERROR, "DEResponseTask:call is null ");
                }
            } catch (MalformedURLException e) {

            }catch (Throwable th) {

            }finally{
            }

            return DEResponse;
        }
    }

这个多线程代码的编写方式有什么问题吗?如果是,我怎样才能提高效率?

executeAll在我的公司中为方法签名,executor他们有自己的执行程序,它将实现 Sun Executor 类-

/**
     * Executes the given tasks, returning a list of futures holding their 
     * status and results when all complete or the timeout expires, whichever
     * happens first.  <tt>Future.isDone()</tt> is <tt>true</tt> for each
     * element of the returned list.  Upon return, tasks that have not completed
     * are cancelled.  Note that a <i>completed</i> task could have terminated
     * either normally or by throwing an exception.  The results of this method
     * are undefined if the given collection is modified while this operation is
     * in progress.  This is entirely analogous to
     * <tt>ExecutorService.invokeAll()</tt> except for a couple of important
     * differences.  First, it cancels but does not <b>interrupt</b> any 
     * unfinished tasks, unlike <tt>ExecutorService.invokeAll()</tt> which
     * cancels and interrupts unfinished tasks.  This results in a better 
     * adherence to the specified timeout value, as interrupting threads may
     * have unexpected delays depending on the nature of the tasks.  Also, all 
     * eBay-specific features apply when the tasks are submitted with this 
     * method.
     * 
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return a list of futures representing the tasks, in the same sequential
     * order as produced by the iterator for the given task list.  If the 
     * operation did not time out, each task will have completed.  If it did
     * time out, some of these tasks will not have completed.
     * @throws InterruptedException if interrupted while waiting, in which case
     * unfinished tasks are cancelled
     */
    public <V> List<LoggingFuture<V>> executeAll(Collection<? extends CallableTask<V>> tasks, 
                                                 Options options, 
                                                 long timeout, TimeUnit unit)
            throws InterruptedException {
        return executeAll(tasks, options, timeout, unit, false);
    }

更新:-

一旦我通过增加线程来增加执行基准测试的程序的负载,这个组件就需要时间20

newFixedThreadPool(20)

但我相信如果我使用这个组件可以正常工作 -

newSingleThreadExecutor

我能想到的唯一原因是,可能在上面的代码中,有一个阻塞调用,所以这就是线程被阻塞的原因,这就是为什么它需要时间?

更新:-

所以这行应该这样写?-

if(futures!=null && futures.size()>0){
                    LoggingFuture<DEResponse> future = futures.get(0);
                    //response = future.get();//replace this with below code-

                    while(!future.isDone()) {
                        Thread.sleep(500);
                    } 

                    response = future.get();
                }
4

2 回答 2

0

如果我正确阅读了您的代码,则存在一个明显的性能问题。这个:

public class DEResponseTask  extends BaseNamedTask implements CallableTask<DEResponse> {
    private final ObjectMapper m_mapper = new ObjectMapper();

每个任务都被调用一次,并且创建ObjectMapper实例非常昂贵。

有很多方法可以解决这个问题,但您可能想要:

  1. 使 m_mapper 引用静态(仅创建一次)——映射器在配置后可以安全共享,或者
  2. 传入共享ObjectMapper(共享是安全的)

这样做应该会对 JSON 处理效率产生很大影响。

于 2013-04-22T21:09:24.727 回答
0

除了您使用的是复杂的非标准执行器之外,我没有看到任何应该导致性能下降的东西。我意识到您对于可以使用哪个 Executor 没有任何选择,但出于好奇,我会尝试用 a 替换它,ThreadPoolExecutor看看这是否有任何区别,并使用现有的权力提出它如果您发现有重大改进,请您的工作 - 在我的工作中,我们发现另一个部门编写的加密库绝对是垃圾(我们 80-90% 的 CPU 时间花费在他们的代码中)并成功游说他们重写它。

编辑:

public class Aggregator implements Runnable {
    private static ConcurrentLinkedQueue<Future<DEResponse>> queue = new ConcurrentLinkedQueue<>();
    private static ArrayList<DEResponse> aggregation = new ArrayList<>();

    public static void offer(Future<DEResponse> future) {
        queue.offer(future);
    }

    public static ArrayList<DEResponse> getAggregation() {
        return aggregation;
    }

    public void run() {
        while(!queue.isEmpty()) { // make sure that all of the futures are added before this loop starts; better still, if you know how many worker threads there are then keep a count of how many futures are in your aggregator and quit this loop when aggregator.size() == [expected number of futures]
            aggregation.add(queue.poll().get());
        }
    }
}

public void getDEAttributes(List<DEKey> keys, List<String> reqAttrNames) {
    try {
        if(url!=null){
            try {
                futures = executor.executeAll(tasks, null, waitTimeout, TimeUnit.MILLISECONDS);
                if(futures!=null && futures.size()>0){
                    Aggregator.offer(futures.get(0));
                }
            }
        }
    }
}
于 2013-04-21T02:23:26.247 回答