10

我有一个Job Distributor在不同的网站上发布消息的人Channels

此外,我希望有两个(以及未来更多)Consumers从事不同的任务并在不同的机器上运行。(目前我只有一个,需要扩展它)

让我们为这些任务命名(只是示例):

  • FIBONACCI(生成斐波那契数)
  • RANDOMBOOKS(生成随机句子来写一本书)

这些任务最多运行 2-3 个小时,应该平均分配给每个Consumer.

每个消费者都可以有x 并行线程来处理这些任务。所以我说:(这些数字只是示例,将被变量替换)

  • 机器 1 可以消耗 3 个并行作业FIBONACCI和 5 个并行作业RANDOMBOOKS
  • 机器 2 可以消耗 7个并行作业FIBONACCI和 3 个并行作业RANDOMBOOKS

我怎样才能做到这一点?

我是否必须x为每个线程启动每个线程Channel才能收听Consumer

我什么时候必须承认这一点?

我目前的唯一方法Consumer是:为每个任务启动x线程 - 每个线程都是一个 Defaultconsumer 实现Runnable。在handleDelivery方法中,我打电话basicAck(deliveryTag,false)然后做工作。

进一步:我想将一些任务发送给一个特殊的消费者。如何结合上述公平分配来实现这一目标?

这是我的代码publishing

String QUEUE_NAME = "FIBONACCI";

Channel channel = this.clientManager.getRabbitMQConnection().createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

channel.basicPublish("", QUEUE_NAME,
                MessageProperties.BASIC,
                Control.getBytes(this.getArgument()));

channel.close();

这是我的代码Consumer

public final class Worker extends DefaultConsumer implements Runnable {
    @Override
    public void run() {

        try {
            this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
            this.getChannel().basicConsume(this.jobType.toString(), this);

            this.getChannel().basicQos(1);
        } catch (IOException e) {
            // catch something
        }
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Control.getLogger().error("Exception!", e);
            }

        }
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        this.getChannel().basicAck(deliveryTag, false); // Is this right?
        // Start new Thread for this task with my own ExecutorService

    }
}

在这种情况下,课程Worker开始了两次:一次为FIBUNACCI一次,一次为RANDOMBOOKS

更新

正如答案所述,RabbitMQ 不是最好的解决方案,但 Couchbase 或 MongoDB 拉取方法是最好的。我是这些系统的新手,有没有人可以向我解释,这将如何实现?

4

5 回答 5

7

这是我将如何在 couchbase 上构建它的概念视图。

  1. 您有一些机器来处理工作,并且有一些机器(可能是相同的机器)创建工作要做。
  2. 您可以在 couchbase 的存储桶中为每个作业创建一个文档(如果您将其与该存储桶中的其他数据混合,则将其类型设置为“作业”或其他内容)。
  3. 每个工作描述,连同要完成的具体命令,可能包括它的创建时间、到期时间(如果有特定的到期时间)和某种生成的工作价值。这个工作值可以是任意单位。
  4. 每个工作的消费者都会知道它一次可以做多少个工作单元,以及有多少可用(因为其他工人可能正在工作。)
  5. 因此,一台具有 10 个工作单元容量且有 6 个工作单元正在完成的机器将执行查询以查找 4 个或更少工作单元的工作。
  6. 在 couchbase 中,有一些视图是增量更新的 map/reduce 作业,我认为您在这里只需要 map 阶段。您将编写一个视图,让您查询到期时间、输入系统的时间和工作单元的数量。这样你就可以得到“4个工作单位或更少的最迟到的工作”。
  7. 这种查询,随着容量的释放,将首先获得过期最多的工作,尽管您可以获得最大的过期工作,如果没有,那么最大的未过期工作。(其中“逾期”是当前时间与作业截止日期之间的差值。)
  8. Couchbase 视图允许像这样非常复杂的查询。虽然它们是增量更新的,但它们并不是完全实时的。因此,您不会寻找一份工作,而是一份求职者名单(按您的意愿排序。)
  9. 因此,下一步是获取职位候选人列表并检查第二个位置 - 可能是锁定文件的 membase 存储桶(例如:RAM 缓存,非持久性)。锁定文件将有多个阶段(在这里,您使用 CRDT 或最适合您需要的任何方法进行一些分区解析逻辑。)
  10. 由于此存储桶是基于 ram 的,因此它比视图更快,并且与总状态的延迟更少。如果没有锁定文件,则创建一个状态标志为“临时”的文件。
  11. 如果另一个工作人员获得相同的工作并看到锁定文件,那么它可以跳过该工作候选人并执行列表中的下一个工作。
  12. 如果,不知何故,两个工人试图为同一个工作创建锁定文件,就会发生冲突。在发生冲突的情况下,您可以平底船。或者,您可以有逻辑,其中每个工作人员对锁定文件进行更新(CRDT 解析,因此使这些幂等,以便可以合并兄弟姐妹)可能放入一个随机数或某个优先级数字。
  13. 在指定的时间段(可能是几秒钟)后,锁文件由工作人员检查,如果它没有进行任何竞争解决更改,它会将锁文件的状态从“临时”更改为“已采取” "
  14. 然后,它使用“已接受”或类似的状态更新工作本身,以便当其他工作人员正在寻找可用工作时它不会出现在视图中。
  15. 最后,您需要添加另一个步骤,在执行查询以获取我上面描述的这些求职者之前,您执行一个特殊查询以查找已采取的工作,但所涉及的工人已经死亡。(例如:过期的工作)。
  16. 知道工人何时死亡的一种方法是,放入 membase 存储桶中的锁定文件应该有一个到期时间,这将导致它最终消失。可能这个时间很短,工作人员只需触摸它即可更新到期时间(这在 couchbase API 中得到支持)
  17. 如果一个工人死了,最终它的锁文件会消失,孤立的工作将被标记为“已接受”但没有锁文件,这是寻找工作的工人可以寻找的条件。

所以总而言之,每个工作人员都会查询孤立的作业,如果有的话,依次检查是否有锁文件,如果没有,则创建一个,并遵循上述正常的锁定协议。如果没有孤立的作业,那么它会查找过期作业,并遵循锁定协议。如果没有过期的作业,那么它只取最旧的作业并遵循锁定协议。

当然,如果您的系统没有“过期”之类的东西,这也将起作用,并且如果及时性无关紧要,那么您可以使用另一种方法来代替最旧的工作。

另一种方法可能是在 1-N 之间创建一个随机值,其中 N 是一个相当大的数字,例如工人数量的 4 倍,并用该值标记每个工作。每次工人去寻找工作时,它都可以掷骰子,看看是否有任何具有该编号的工作。如果没有,它将再次这样做,直到找到具有该号码的工作。这样,与其让多个工作人员争夺少数“最旧”或最高优先级的工作,而且更有可能发生锁争用,不如将它们分散开来……以队列中的时间为代价,比先进先出的情况更随机.

随机方法也可以应用于必须适应负载值的情况(这样单台机器不会承担太多负载),而不是采用最旧的候选者,而是从列出可行的工作并尝试去做。

编辑添加:

在第 12 步中,我说“可能放入一个随机数”,我的意思是,如果工人知道优先级(例如:哪个人最需要做这项工作),他们可以将代表这个的数字放入文件中。如果没有“需要”这份工作的概念,那么他们都可以掷骰子。他们用他们的骰子角色更新这个文件。然后他们两个都可以看看它,看看对方滚了什么。如果他们输了,他们就会下注,而其他工人知道它有它。通过这种方式,您可以在没有大量复杂协议或协商的情况下解决哪个工人接受工作。我假设两个工作人员都在这里访问同一个锁文件,它可以通过两个锁文件和一个找到所有锁文件的查询来实现。如果经过一段时间后,

于 2013-03-16T22:43:01.483 回答
5

首先让我说我没有使用 Java 与 RabbitMQ 通信,所以我无法提供代码示例。但是,这应该不是问题,因为这不是您要问的。这个问题更多的是关于您的应用程序的一般设计。

让我们分解一下,因为这里有很多问题。

将任务分配给不同的消费者

这样做的一种方法是使用循环,但这是相当粗糙的,并且没有考虑到不同的任务可能需要不同的时间来完成。那么该怎么办。这样做的一种方法是将 设置prefetch1。预取意味着消费者将消息缓存在本地(注意:消息尚未被消费)。通过将此值设置为 1,不会发生预取。这意味着您的消费者只会知道并且只有它当前正在处理的消息在内存中。这使得只有在工作人员空闲时才接收消息成为可能。

何时确认

通过上述设置,可以从队列中读取消息,将其传递给您的线程之一,然后确认消息。对所有可用线程执行此操作 -1。您不想确认最后一条消息,因为这意味着您将打开以接收另一条您无法传递给您的工作人员的消息。当其中一个线程完成时,即是您确认该消息的时间,这样您的线程将始终与某些东西一起工作。

传递特殊信息

这取决于你不想做什么,但总的来说,我会说你的制作人应该知道他们在传递什么。这意味着您可以将它发送到某个交换器,或者更确切地说,使用某个路由键将这个消息传递到一个适当的队列,该队列将有一个消费者听它,知道如何处理该消息。

我建议您阅读 AMQP 和 RabbitMQ,这可能是一个很好的起点

警告

我的提议和您的设计中存在一个重大缺陷,那就是ACK我们在实际处理完消息之前就已发送消息。这意味着当(不是如果)我们的应用程序崩溃时,我们无法重新创建ACKed消息。如果您事先知道要启动多少个线程,则可以解决此问题。我不知道您是否可以动态更改预取计数,但不知何故我对此表示怀疑。

一些想法

从我对 RabbitMQ 的经验来看,你不应该害怕创建交换和队列,如果做得正确,这些可以极大地改进和简化你的应用程序设计。也许你不应该有一个启动一堆消费者线程的应用程序。相反,您可能希望有某种包装器,它根据系统中的可用内存或类似的东西启动消费者。如果您这样做,您可以确保在您的应用程序崩溃时不会丢失任何消息,因为如果您这样做,您当然会在完成消息时确认该消息。

推荐阅读

如果有什么不清楚的地方或者我错过了你的观点,请告诉我,如果可以的话,我会尝试扩展我的答案或改进它。

于 2013-03-13T23:45:22.947 回答
3

以下是我对你的问题的看法。正如@Daniel 在他的回答中提到的那样,我相信这更多的是架构原则问题而不是实现问题。一旦架构清晰,实现就变得微不足道。

首先,我想解决一些与调度理论有关的问题。您在这里有非常长时间运行的任务,如果没有以正确的方式安排它们,您将 (a) 最终以低于满负荷运行您的服务器或 (b) 完成任务的时间比其他方式可能要长得多. 所以,我有一些与你的调度范式有关的问题:

  1. 你有能力估计每项工作需要多长时间吗?
  2. 这些工作是否有与之相关的截止日期,如果有,它是如何确定的?

RabbitMQ 在这种情况下是否合适?

我不相信 RabbitMQ 是调度极其长时间运行的作业的正确解决方案。事实上,我认为您之所以会遇到这些问题,是因为 RabbitMQ 不是适合这项工作的工具。默认情况下,在将作业从队列中删除以确定接下来应该处理哪些作业之前,您对作业没有足够的了解。其次,正如@Daniel 的回答中所提到的,您可能无法使用内置的 ACK 机制,因为每当与 RabbitMQ 服务器的连接失败时,重新排队的作业可能会很糟糕。

相反,我会寻找诸如 MongoDB 或 Couchbase 之类的东西来存储您的“队列”作业。然后,您可以完全控制调度逻辑,而不是依赖 RabbitMQ 强制执行的内置循环。

其他注意事项:

此外,我希望有两个(以及将来更多)从事不同任务并在不同机器上运行的消费者。(目前我只有一个,需要扩展它)

在这种情况下,我认为您不想使用基于推送的消费者。相反,使用基于拉取的系统(在 RabbitMQ 中,这将被称为 Basic.Get)。通过这样做,您将负责作业调度

消费者 1 有 3 个 FIBONACCI 线程和 5 个 RANDOMBOOKS 线程。消费者 2 有 7 个 FIBONACCI 线程和 3 个 RANDOMBOOKS 线程。我怎样才能做到这一点?

在这种情况下,我不确定我是否理解。您是否有fibonacci一份工作,并且您正在以某种方式在您的服务器上并行执行它?还是您希望您的服务器同时执行许多fibonacci作业?假设是后者,您将创建线程以在服务器上完成工作,然后将作业分配给它们,直到所有线程都已满。当一个线程可用时,您将轮询队列以启动另一个作业。

您的其他问题:

  • 我是否必须为每个 Channel 启动 x Threads 才能监听每个 Consumer ?
  • 我什么时候必须承认这一点?
  • 我目前只针对一个消费者的方法是:为每个消费者启动 x 个线程
  • 任务 - 每个线程都是实现 Runnable 的 Defaultconsumer。在handleDelivery 方法中,我调用了basicAck(deliveryTag,false) 然后进行工作。
  • 进一步:我想将一些任务发送给一个特殊的消费者。如何结合上述公平分配来实现这一目标?

我相信,一旦您将调度责任从 RabbitMQ 服务器转移到您的个人消费者身上,上述问题将不再是问题(消费者,我的意思是消费线程)。此外,如果您使用更多由数据库驱动的东西(例如 Couchbase),您将能够自己编程这些东西,并且您可以完全控制逻辑。

使用沙发底座

虽然如何将 Couchbase 用作队列的详细说明超出了此问题的范围,但我可以提供一些建议。

  • 首先,您需要阅读Couchbase
  • 我建议将作业存储在 Couchbase 存储桶中,并依靠索引视图列出可用作业。如何为每个作业定义密钥有很多选项,但作业本身需要序列化为 JSON。也许使用ServiceStack.Text
  • 当一个作业被拉出来进行处理时,需要一些逻辑来标记作业在 Couchbase 中的状态。您将需要使用一种CAS方法来确保其他人没有在您拥有的同一时间接受该工作进行处理。
  • 您将需要某种策略来清除队列中失败和已完成的作业。

概括

  1. 不要为此使用 RabbitMQ
  2. 使用每个作业的参数提出智能调度算法。一旦我对你的工作性质有了更多的了解,我就可以帮助你。
  3. 根据 #2 中的算法将作业拉入工作人员,而不是从服务器推送它们。
  4. 想出自己的方式来跟踪整个系统中的作业状态(排队、运行、失败、成功等)以及何时/是否重新调度停滞的作业。
于 2013-03-14T00:26:54.747 回答
1

如果正在使用 spring 或者愿意使用 spring,那么你可以使用 spring listener 容器支持来实现它。这将为您提供您正在寻找的类似回调类型的编程模型。

Spring AMQP 参考文档中的示例代码

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}
于 2012-09-05T09:21:32.170 回答
0

最近我推送了分支 bug18384,它改变了将回调发送到消费者实现的方式。

在此更改之后,Connection 维护了一个调度线程,用于向消费者发送回调。这释放了消费者调用 Connection 和 Channel 上的阻塞方法。

Twitter 上出现了一个关于如何使这个可配置的问题,允许将自定义 Executor 插入 ConnectionFactory。我想概述为什么这很复杂,讨论一个可能的实现,看看是否有很多兴趣。

首先,我们应该确定每个消费者应该只在一个线程中接收回调。如果不是这种情况,那么混乱将会接踵而至,消费者将需要担心他们自己的线程安全,而不是初始化安全。

所有消费者只有一个调度线程,这种消费者线程配对很容易得到尊重。

当我们引入多线程时,我们要保证每个Consumer只与一个线程配对。使用 Executor 抽象时,这可以防止每个回调调度被包装在 Runnable 中并发送到 Executor,因为您无法保证将使用哪个线程。

为了解决这个问题,可以将 Executor 设置为运行“n”个长时间运行的任务(n 是 Executor 中的线程数)。这些任务中的每一个都从队列中提取调度指令并执行它们。每个消费者都与一个调度指令队列配对,可能是循环分配的。这并不太复杂,并且会在 Executor 中的线程之间提供简单的调度负载平衡。

现在,仍然存在一些问题:

  1. Executor 中的线程数不一定是固定的(与 ThreadPoolExecutor 一样)。
  2. 没有办法,通过 Executor 或 ExecutorService 找出有多少线程。因此,我们无法知道要创建多少个调度指令队列。

但是,我们当然可以引入 ConnectionFactory.setDispatchThreadCount(int)。在幕后,这将创建一个 Executors.newFixedThreadPool() 以及正确数量的调度队列和调度任务。

我很想知道是否有人认为我忽略了解决这个问题的一些更简单的方法,事实上这是否值得解决。

于 2013-03-20T10:38:26.507 回答