3

我在 tomcat 服务器上运行了一个 RESTful 风格的 RPC(远程过程调用)API,该 API 在 K 线程上处理具有 M 个任务的 N 个用户的数据。大多数情况下,一个用户有大约 20 到 500 个任务(但 M 可能在 1 到 5000 之间)。一项任务需要大约 10 到 20 秒才能完成,但可能在 1 秒到 20 分钟之间。目前,系统大多只有一个用户,有时最多三个,但在不久的将来会同时增加到大约 10 个用户。我们的服务器有 10 个内核,因此我想使用 10 个线程。目前,每个用户都有 5 个线程进行处理,效果很好。但是 a) 大多数时候机器的利用率只有 50%(这导致针在“30 分钟”范围内等待),有时服务器负载高达 150%。

解决方案要求:

  1. 始终使用 100% 的服务器(如果有任务)
  2. 所有用户在线程执行方面都被同等对待(与其他用户完成的线程数量相同)
  3. 新用户不必等到之前用户的所有任务都完成后(尤其是在 user1 有 5000 个任务而 user2 有 1 个任务的情况下,这一点很重要)

想到的解决方案:

  1. 只需使用具有 10 个线程的 FixedThreadPoolExecutor,违反条件 3

  2. 在我的任务中使用 PriorityBlockingQueue 并实现 compareTo 方法 -> 不能使用 threadpoolExecutors 提交方法(因此我不知道提交的任务何时结束)

  3. 实现类似阻塞队列的“循环”,其中 K 个线程(在我们的例子中为 10 个)以循环方式从 N 个内部队列中获取新任务 -> 为了能够将任务放入正确的队列中,我需要一个“提交”- 接受多个参数的方法(我也需要实现一个 ThreadPoolExecutor)

我试图说明我所说的循环法的含义,比如阻塞队列(如果没有帮助,请随时编辑它):

  --                       --
  --        --        --   --             queue task load, 
  --   --   --   --   --   --        --   one task denoted by --
  --   --   --   --   --   --   --   -- 
| Q1 | Q2 | Q3 | Q4 | Q5 | Q6 | Q7 | QN |
|                      *   ^            |
|                  last|   |next        |
|                           -------------
\                          /
 \    |    |    |    |    |
 | T1 | T2 | T3 | T4 | TK |

是否有一个优雅的解决方案来主要使用 Java 标准 API(或任何其他广泛使用的 Java API)来实现这种处理行为(可能是我提出的解决方案之一或任何其他解决方案)?或者您对如何解决此问题有任何其他提示?

4

3 回答 3

0

我一直在研究类似于循环设置的解决方案。它变得非常复杂,但我想我想出了一个不错的实现。这可能不是一个非常优雅的解决方案,但有一些单元测试显示了一些功能。不幸的是,TaskQ还没有处于“1.0”阶段。

它确实涵盖了您的第 1 到第 3 点:

  • 您可以指定要使用的线程数量,如果有足够的任务,则使用所有线程。
  • 当线程可用时,每个用户都会轮到
  • 如果一个用户 A 有 500 个任务在队列中,而另一个用户 B 有 1 个任务,则用户 B 的任务将在线程可用时立即执行。

目前还没有手册/文档,希望您能抽出时间进行调查。显示一些用法的单元测试在这里,扩展/使用的主要类是RunnableTaskQWithQos

于 2014-12-02T00:49:36.830 回答
0

如果您同意最小化总体任务延迟可以很好地替代需求 2 和 3,并且您有足够好的任务运行时间估计,那么我可能会有答案。

您将任务提交时间与每个任务一起存储,以便以后始终可以计算其估计延迟。然后,您可以构建一个 PriorityBlockingQueue,当插入一个新任务时,它总是将其插入到提供一些公平性并尝试最小化整体延迟的队列位置。首先,这将使长时间运行的任务处于不利地位。我自己没有尝试过,但我会根据您估计的运行时间分配任务优先级,并使用估计的Runtime-waitingTime 作为优先级(首先采取最低优先级的工作)。这将为繁重的任务在等待足够长的时间以具有负优先级后提供机会。在那之前,轻量级任务将更有可能成为第一名,即使它们刚刚提交。不过,这种安排只会在您的估计允许的情况下如此公平。

至于循环要求:如果这真的很重要,您也可以在队列中处理它。基本上,当您使用线程池时,您可以根据在队列中插入新作业的位置来实施您的调度策略。如果您可以估计作业延迟,您也可以通过在正确的位置插入来平衡您的用户。

于 2014-12-01T18:14:19.667 回答
0

满足您的要求:

1) 最大化线程使用:任何 ThreadPoolExecutor 都会处理这个问题。
2)所有用户都被同等对待:本质上需要循环设置。
3)避免新用户按FIFO顺序等待:同#2。

您还提到了提交和获得结果的能力。

您可能会考虑PriorityBlockingQueue<Job>使用包装器对象进行独立,例如:

class Job implements Comparable<Job> {
    private int priority;
    private YourCallable task;

    public Job(int priority, YourCallable task) {
        this.priority = priority;
        this.task = task;
    }

    @Override
    public int compareTo(Job job) {
        // use whatever order you prefer, based on the priority int
    }
}

您的生产者向 PriorityBlockingQueue 提供了一个分配了优先级的作业(基于您的循环规则或其他),以及一个实现 Callable 的任务。然后你的消费者会为一个 Job 做 queue.poll。

一旦掌握了它,您就可以获取包含在该 Job 对象中的任务并将其发送到您选择的 ThreadPoolExecutor 上进行处理。

于 2014-12-01T18:49:09.950 回答