10

我们正在开发一个应用程序,其中一组对象可以通过接收来自 3 个不同来源的消息而受到影响。每条消息(来自任何来源)都有一个对象作为其目标。每个消息接收器都将在自己的线程上运行。

我们希望消息的处理(接收后)尽可能高速,因此针对目标对象的消息处理将由线程池中的另一个线程完成。消息的处理将比阅读/接收来自发送者的消息花费更长的时间。

我认为如果池中的每个线程仅专用于一组特定的对象,它会更快,例如:

Thread1 -> objects named A-L
Thread2 -> objects named M-Z

每组对象(或线程)都有一个专用的待处理消息队列。

我的假设是,如果唯一需要的线程同步是在每个接收线程和一个处理线程之间,在需要将消息放入阻塞队列的持续时间内,它会比随机分配工作线程来处理更快消息(在这种情况下,可能有 2 个不同的线程为同一对象提供消息)。

我的问题实际上是两部分:

  1. 人们是否同意将工作线程专用于一组特定对象是一种更好/更快的方法的假设?

  2. 假设这是一种更好的方法,现有的 Java ThreadPool 类是否有办法支持这一点?还是需要我们编写自己的 ThreadPool 实现?

感谢您提供的任何建议。

4

5 回答 5

10

[是否] 将工作线程专用于一组特定的对象是一种更好/更快的方法吗?

我假设总体目标是尝试最大化这些入站消息的并发处理。您有来自 3 个来源的接收器,它们需要将消息放入将得到最佳处理的池中。因为来自 3 个源中的任何一个的消息可能会处理无法同时处理的相同目标对象,所以您希望以某种方式划分消息以便可以同时处理它们,但前提是它们保证不会引用相同的目标对象。

我将hashCode()在您的目标对象(可能只是name.hashCode())上实现该方法,然后使用该值将对象放入一个BlockingQueues 数组中,每个对象都有一个使用它们的线程。使用数组Executors.newSingleThreadExecutor()就可以了。通过队列数修改哈希值模式并将其放入该队列中。您需要将处理器的数量预先定义为最大值。取决于处理的 CPU 密集程度。

所以类似下面的代码应该可以工作:

 private static final int NUM_PROCESSING_QUEUES = 6;
 ...
 ExecutorService[] pools = new ExecutorService[NUM_PROCESSING_QUEUES];
 for (int i = 0; i < pools.length; i++) {
    pools[i] = Executors.newSingleThreadExecutor();
 }
 ...
 // receiver loop:
 while (true) {
    Message message = receiveMessage();
    int hash = Math.abs(message.hashCode());
    // put each message in the appropriate pool based on its hash
    // this assumes message is runnable
    pools[hash % pools.length].submit(message);
 }

这种机制的好处之一是您可以限制有关目标对象的同步。你知道同一个目标对象只会被一个线程更新。

人们是否同意将工作线程专用于一组特定对象是一种更好/更快的方法的假设?

是的。这似乎是获得最佳并发性的正确方法。

假设这是一种更好的方法,现有的 Java ThreadPool 类是否有办法支持这一点?还是需要我们编写自己的 ThreadPool 实现?

我不知道有任何线程池可以做到这一点。但是,我不会编写您自己的实现。只需像上面的代码大纲一样使用它们。

于 2012-11-12T18:36:51.510 回答
3

一般来说,这样的方法是一个坏主意。它属于“不要及早优化”的口头禅。

此外,如果实施你的想法可能会损害你的表现,而不是帮助它。一个无法正常工作的简单示例是,如果您突然在一种类型上收到大量请求 - 另一个工作线程将处于空闲状态。

最好的方法是使用标准的生产者-消费者模式,并通过在各种负载下进行系统测试来调整消费者线程的数量——最好是通过输入真实交易的记录。

这些情况的“转到”框架是java.util.concurrent包中的类。我建议使用一个BlockingQueue(可能是一个ArrayBlockingQueue)和一个ExecutorServiceExecutors工厂方法之一创建的,可能是newCachedThreadPool().


一旦您实施并进行了系统测试,如果您发现已证实的性能问题,请分析您的系统,找到瓶颈并修复它。

你不应该及早优化的原因是大多数时候问题不在你期望的地方

于 2012-11-12T18:23:12.403 回答
0

作为替代方法:我建议为此使用现有框架,例如RabbitMQActiveMQ。尝试发明自己的消息传递框架可能是一个挑战。如果您尝试使用自己的框架增加价值,那是一回事。如果您只需要一个来实现您的目标,那就是另一个。这些框架提出了许多优化消息传递的选项,值得考虑。

于 2012-11-12T17:41:33.583 回答
0

我的回答是:

  • 1 - 是
  • 2 -
    • a) 没有
    • b) 你不需要

一些解释:

  • 您希望一项任务根据某种算法将消息分发到不同的队列,
  • 您希望每个消息队列有一个任务从其分配的队列中提取消息并处理它们。

我不认为这些前提与线程池的目的相矛盾,线程池只是将任务与线程相关联。但是在这个模型中,线程池只会将线程与任务关联一次,然后线程将继续运行以轮询其输入消息队列。

线程的摩擦点应该是中间消息队列,可能还有与这些消息处理相关的其他资源。根据您的解释,我想您计划通过巧妙地将消息处理划分为任务来将第二种类型减少到最低限度。每个队列只能由与队列关联的分区任务和处理任务访问,因此它应该是最小的。

于 2012-11-12T18:03:58.757 回答
0

您应该能够为 ThreadPoolExecutor 提供一个特殊的 BlockingQueue。队列会记住哪个线程正在处理哪种类型的消息,以便它可以保留所有相同类型的消息。

MyQueue

    ownership relation of thread - msgType 

    take/poll()

        if current thread owns msg type X
            if there is a message of type X
                return that message
            else
                give up ownership

        // current thread does not own any message type
        if there is a messsage of type Y, Y is not owned by any thread
            current thread owns Y
            return that message

        // there's no message belonging to an unowned type
        wait then retry 
于 2012-11-12T19:18:54.833 回答