2

我有一个基于 Apache Camel 的中间件,它执行如下事务:

from("amq:job-input")
  to("inOut:businessInvoker-one") // Into business processor
  to("inOut:businessInvoker-two")
  to("amq:job-out");

目前它工作得很好。但我无法扩大规模,比如说从 100 TPS 到 500 TPS。我已经

  1. 提高了并发消费者设置并使用了空的 businessProcessor
  2. 配置 JAVA_XMX 和 PERMGEN

以加快交易速度。

根据 Active MQ Web 控制台,在 500TPS 场景中等待处理的消息非常多。我想,解决方案之一是扩大 ActiveMQ。所以我想在集群中使用多个代理。

根据http://fuse.fusesource.org/mq/docs/mq-fabric.html(“拓扑”部分),以集群模式配置ActiveMQ适用于非持久消息。恕我直言,确实不合适,因为所有正在运行的代理都使用相同的存储文件。但是,分离存储文件呢?现在有可能吧?

有人可以解释一下吗?如果不可能,负载平衡持久消息的最佳方法是什么?

谢谢

4

5 回答 5

1

您可以通过创建 2 个主/从对来分担持久消息的负载。master 和 slave 通过数据库或共享文件系统共享它们的状态,因此您需要复制该设置。

创建 2 个主从对,并在 2 个对之间配置所谓的“网络连接器”。这将使您的性能翻倍,而不会丢失消息。

http://activemq.apache.org/networks-of-brokers.html

于 2015-03-10T13:33:54.723 回答
0

添加消费者可能会有所帮助(取决于您的服务器拥有的核心/CPU数量)。在您的“Camel 服务器”利用所有可用 CPU 进行业务处理之外添加线程是没有意义的,并且可以提高生产力。

可能需要添加更多 ActiveMQ 机器。您可以使用 ActiveMQ“网络”在具有分离的持久性文件的实例之间进行通信。添加更多代理并将它们放入网络应该是直截了当的。

确保您在路上进行性能测试,以确保代理可以处理什么样的负载以及骆驼处理器可以处理什么样的负载(如果在不同的机器上)。

当您进行持久消息传递时 - 您可能还需要事务。确保您正在使用它们。

于 2013-08-14T22:23:33.803 回答
0

如果所有正在运行的代理使用相同的存储文件或支持 tx 的数据库进行持久化,那么只有第一个启动的代理将处于活动状态,而其他代理则处于待机模式,直到第一个失去锁定。

如果您想对持久性进行负载平衡,我们可以尝试两种方法:

  1. 在网桥模式下配置多个代理,然后将消息发送给任何一个,并从多个代理发送消费者消息。它可以对代理进行负载平衡,并对持久性进行负载平衡。
  2. 覆盖persistenceAdapter,使用分库中间件(如tddl:https ://github.com/alibaba/tb_tddl )按分区存储消息。
于 2013-08-12T12:30:51.440 回答
0

此答案与添加骆驼详细信息之前的问题版本有关。

目前尚不清楚您想要负载平衡的具体内容以及原因。跨消费者的消息?跨经纪人的生产者?你想解决什么样的问题?

一般来说,您应该避免使用代理网络,除非您尝试解决某种地理用例、有太多连接无法让单一代理处理,或者如果单个代理(可能是在 HA 中配置的一对代理)没有给你你需要的吞吐量(在 90% 的情况下它会)。

在代理网络中,每个节点都有自己的存储,并通过称为存储转发的机制传递消息。阅读了解代理网络以了解其工作原理。

ActiveMQ 已经作为一种负载均衡器工作,它以循环方式在队列上的订阅者之间均匀地分发消息。因此,如果您在队列中有 2 个订阅者,并向其发送消息 A、B、C、D 流;一个订阅者将收到 A & C,而另一个订阅者将收到 B & D。

如果您想更进一步,将相关消息分组到队列中,以便仅由一个订阅者一致处理,您应该考虑Message Groups

于 2013-08-12T12:34:56.857 回答
-1

您的第一步是增加从 ActiveMQ 处理的工作人员的数量。这样做的方法是将?concurrentConsumers=10属性添加到起始 URI。默认行为是只有一个线程从该端点消费,导致 ActiveMQ 中的消息堆积。添加更多经纪人将无济于事。

其次,您似乎正在做的事情可以从分阶段事件驱动架构 (SEDA) 中受益。在 SEDA 中,处理被分解为多个阶段,这些阶段可以有不同数量的消费者来平衡吞吐量。从 ActiveMQ 消耗的线程只执行该过程的一个步骤,将 Exchange 移交到下一阶段,然后返回从输入队列中提取消息。

因此,您的路线可以重写为 2 条较小的路线:

from("activemq:input?concurrentConsumers=10").id("FirstPhase")
    .process(businessInvokerOne)
    .to("seda:invokeSecondProcess");

from("seda:invokeSecondProcess?concurentConsumers=20").id("SecondPhase")
    .process(businessInvokerTwo)
    .to("activemq:output");

这两个阶段可以有不同数量的并发消费者,以便输入队列的消息消费率与输出率相匹配。如果其中一个调用程序比另一个慢得多,这将很有用。

如果您想要消息持久性,可以将seda:端点替换为另一个中间端点。activemq:

最后,为了提高吞吐量,您可以通过分析调用程序本身并优化该代码,专注于使处理本身更快。

于 2013-08-13T10:07:49.080 回答