0

计费类具有与国家计费相关的所有逻辑。它从数据库中获取结果,然后向用户收费。计费类实现 Runnable。我想根据国家参数并行执行计费,以便快速计费大量用户(500 万+)。现在需要几个小时才能完成。

我正在尝试实现 ThreadPoolExecutor 来执行 Billing 类,但我很困惑如何?以下有什么区别或者我做错了什么?请推荐!!总共有 20 个国家,但我在这里只粘贴 5 个。

 //for 20 countries  ThreadPoolExecutor (20,20,20.......)????

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, 
new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());

executor.execute(new Billing("UK")); 
executor.execute(new Billing("USA")); 
executor.execute(new Billing("Germany")); 
executor.execute(new Billing("Spain")); 
executor.execute(new Billing("Italy")); 

或者

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, 
new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<5;i++) // for 20 countries   i<20??
{    

   executor.execute(new Billing("UK")); 
   executor.execute(new Billing("USA")); 
   executor.execute(new Billing("Germany")); 
   executor.execute(new Billing("Spain")); 
   executor.execute(new Billing("Italy")); 
}

while (! executor.isTerminated()) {
   try{
      executor.awaitTermination(100, TimeUnit.SECONDS);
   }catch(InterruptedException iE)
   {
      iE.printStackTrace();
      System.out.println("Executor Exception: "+ iE);
   }

提前致谢!!

4

6 回答 6

1

循环解决方案似乎不正确。无需Runnable多次执行相同的操作。

ThreadPoolExecutor您正在使用两者进行实例化corePoolSizemaximumPoolSize设置为5,这意味着执行程序会将池中的线程数保持为5,即使它们处于空闲状态。它还说5池中只能有线程。

有了这个,您可以期望最多5并行运行的线程执行任务(Billing对象)。

当您继续向with方法提交Billing对象时,它们将添加到您提供的方法中。这个队列的大小在这里。在某些情况下,队列可能已经达到其最大容量并且无法执行更多任务,在这种情况下,任务被拒绝并提供给构造函数中提供的任务。它的工作是用已实现的方法处理被拒绝的任务。executorexecuteArrayBlockingQueue10RejectedExecutionHandlerThreadPoolExecutorrejectedExecution

如果您想查找是否有任何被拒绝的任务,您必须提供自己的任务,RejectedExecutionHandler而不是使用默认的ThreadPoolExecutor.CallerRunsPolicy. 你可以这样做:

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5,
        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r,
                    ThreadPoolExecutor executor) {
                System.out.println("I got rejected: " + r);
                if (!executor.isShutdown()) {
                    r.run();
                }
            }
        });
于 2013-02-27T16:18:21.200 回答
1

第一:忘记循环

for(int i=0;i<5;i++) // for 20 countries   i<20??
{    

   executor.execute(new Billing("UK")); 
   executor.execute(new Billing("USA")); 
   executor.execute(new Billing("Germany")); 
   executor.execute(new Billing("Spain")); 
   executor.execute(new Billing("Italy")); 
}

这会多次循环所有帐单。

正确的做法是在第一个片段中:

executor.execute(new Billing("UK")); 
executor.execute(new Billing("USA")); 
executor.execute(new Billing("Germany")); 
executor.execute(new Billing("Spain")); 
executor.execute(new Billing("Italy")); 

另一个错误在于终止检查:

while (! executor.isTerminated()) {
   try{
      executor.awaitTermination(100, TimeUnit.SECONDS);
   }catch(InterruptedException iE)
   {
      iE.printStackTrace();
      System.out.println("Executor Exception: "+ iE);
   }
}

的 javadocExecutor.awaitTermination说:

在关闭请求后阻塞,直到所有任务都完成执行,

但你永远不会发出关闭请求。

在您的情况下,您可以利用ExecutorCompletionService如下:

CompletionService<String> ecs = new ExecutorCompletionService<String>(executor);
List<String> countries= Arrays.asList("UK","USA","Germany","Spain","Italy");   
for(String country : countries) {
    ecs.submit(new Billing(country),country);
}
// wait for completion
for(int i=0;i<countries.size();i++){
      ecs.take(); // wait for next country completion
}
// all work completed, shutdown
executor.shutdownNow();
于 2013-02-27T16:41:29.770 回答
0

我不确定你是否理解循环是如何工作的。不同之处在于第二个代码块将在每个列出的国家/地区运行 5 次计费。

于 2013-02-27T16:17:34.307 回答
0

假设您正在谈论for代码的循环部分,那么它是如何工作的并不明显。

理想情况下,循环看起来像这样:

for(String country : countryCollection) {
    executor.execute(new Billing(country));
}
于 2013-02-27T16:19:42.917 回答
0

您是否考虑过使用enum?

static class Billing implements Runnable {

  enum Country {
    UK,
    USA,
    Germany,
    Spain,
    Italy;
  }

  public Billing(Country country) {
  }

  @Override
  public void run() {
  }
}
public void test() {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
       new ArrayBlockingQueue<Runnable>(10), 
          new ThreadPoolExecutor.CallerRunsPolicy());
  for ( Billing.Country country : Billing.Country.values() ) {
    executor.execute(new Billing(country));
  }
}
于 2013-02-27T16:32:32.590 回答
0

考虑实现这一点的另一种方法是查看The Fork/Join Framework。这似乎真的可以从工作中受益。例如,您可以看似相当干净地将其分解。这基本上将允许您分解按用户或用户子集计费的任务,而不是让一个看似代表一个国家/地区的线程通过其所有计费工作。

您可以找到该库的链接:如果您使用的是 Java < 7 版本,请点击此处

于 2013-02-27T16:42:52.040 回答