3

我们正在运行 ActiveMQ 5.6.0。在我们的测试环境中,我们有 3 个代理在静态网络中运行。这是当前的情况。我们有 6 个消费者随机连接到 3 个代理。一个代理有 3 个消费者,第二个有 2 个,第三个有 1 个。当我们将消息堆积到队列中时,我们看到消息积压在具有 1 个消费者的第三个代理上,其他两个代理没有任何的积压和剩余 5 个消费者处于空闲状态。

您将在下面找到我们所有一个代理 (dev.queue01) 的配置,其他 2 个与静态主机名的适当更改类似。

我希望消息会自动分发给其他代理以供空闲消费者使用。如果我在问题描述中遗漏了什么,请告诉我。提前感谢您的任何指导。

http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-核心.xsd">

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="prd.queue01" dataDirectory="${activemq.data}">

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" producerFlowControl="false" memoryLimit="1mb"> 
              <pendingSubscriberPolicy>
                <vmCursor />
              </pendingSubscriberPolicy>
            </policyEntry>
            <policyEntry queue=">" producerFlowControl="false" memoryLimit="64mb" optimizedDispatch="true" enableAudit="false" prioritizedMessages="true"> 
              <networkBridgeFilterFactory>
                <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
              </networkBridgeFilterFactory>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>

    <managementContext>
        <managementContext createConnector="true"/>
    </managementContext>

    <persistenceAdapter>
        <amqPersistenceAdapter directory="${activemq.data}/data/amqdb"/>
    </persistenceAdapter>

      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="256 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="750 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="750 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterClientsOnRemove="true" rebalanceClusterClients="true"/>
    </transportConnectors>

    <networkConnectors>
      <networkConnector uri="static:(tcp://dev.queue02:61616,tcp://dev.queue03:61616)" name="queues_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false" networkTTL="4">
      <dynamicallyIncludedDestinations>
        <queue physicalName=">"/> 
      </dynamicallyIncludedDestinations>
      <excludedDestinations>
        <topic physicalName=">"/> 
      </excludedDestinations>
    </networkConnector>
</networkConnectors>


</broker>
<import resource="jetty.xml"/>

4

3 回答 3

3

迟到的答案,但希望它可以帮助未来的读者。

您已经描述了一个代理网络环,其中 B1、B2 和 B3 都相互通信,B1 上有 3 个消费者(C1-C3),B2 上有 2 个消费者(C4 和 C5),还有 1 个消费者(C6)在 B3 上。你没有描述你的消息是在哪里产生的(他们首先去哪个代理),但是假设它是 B3。(B3 将产生与您的描述最准确匹配的最坏情况,但无论在何处生成消息,您仍然会看到负载不均。)

B3 有三个附加的消费者:C6、B1 和 B2。该代理将在这些消费者之间循环消息,因此 1/3 的消息将发送到 C6,1/3 发送到 B1,1/3 发送到 B2。

B1 有五个附加消费者:C1、C2、C3、B2 和 B3。但是消息不会被传递到它们刚刚来自的同一代理,因此有 4 个消费者计算来自 B3 的消息:C1、C2、C3 和 B2。因此,在全部消息的 1/3 中,C1、C2 和 C3 将分别获得 1/4(总数的 1/12),而 B2 将获得相同的总数的 1/12。稍后会详细介绍。

B2 有四个附加消费者:C4、C5、B1 和 B3。但是消息不会被传递到它们刚刚来自的同一代理,因此有 3 个消费者计算来自 B3 的消息:C4、C5 和 B1。因此,在全部消息的 1/3 中,C4 和 C5 将分别获得 1/3(总数的 1/9),而 B1 将获得相同的总数的 1/9。稍后还会详细介绍。

到目前为止,我们已经看到 C6 获得总消息的 1/3,C1-C3 获得总消息的 1/12,C4-C5 获得总消息的 1/9,1/12 + 1/9 = 7 /36 总消息路由到第二个代理。现在让我们回到那些消息。

在遵循 B3 -> B1 -> B2 路径的消息中(占总数的 1/12),它们将在 C4 和 C5 之间循环(因为消息无法返回到其原始代理 B3),因为每条消息总数的 1/24。所以 C4 和 C5 将收到总数的 1/9 + 1/24 = 11/72。

类似地,在经过 B3 -> B2 -> B1 路径的消息中(占总数的 1/9),它们将在 C1、C2 和 C3 之间循环,因此 C1、C2 和 C3 将收到1/12 + 1/27 = 总数的 13/108。

在经过 B3 -> B1 -> B2 -> B3 路径的消息中(占总数的 1/36),一半进入 C6(占总数的 1/72),一半进入 B1(占总数的 1/72)总数)。同样,在经过 B3 -> B2 -> B1 -> B3 路径的消息中(总数的 1/36),一半进入 C6(总数的 1/72),一半进入 B2(1/总数中的 72 个)。所以 C6 获得了 1/36 的消息(总共 13/36),B1 获得了总数的 1/72,B2 获得了总数的 1/72。

我们现在正进入收益递减状态,但您现在可以看到,C6 在总消息中获得了超大份额(36%),而连接到 B1(拥有最多消费者)的消费者每个都获得了较小的份额(低于 10%),导致 C6 有很多工作要做,而 C1-5 的工作要少得多,并且如您所观察到的那样花费时间闲置。您还可以看到某些消息如何通过网络经过很长的路径导致高延迟,但这不是您的问题。

于 2014-08-01T21:54:30.990 回答
1

如果我理解正确,经纪人的意思是在这里排队。

  • 您所有的经纪人都有相同类型的对象。
  • 你所有的消费者都做同样的过程。
  • 您希望在您的消费者之间平均分担工作量。
  • 操作顺序并不那么重要。

我尝试在 Active MQ 5.5.1 上做同样的事情。我所做的只是创建了一个队列,并创建了多个消费者。我将所有消费者指向同一个队列。

Active-MQ 自动处理分发。

我观察到以下示例:

如果我有一个队列 - 有 2000 条记录。如果我同时将 2 个消费者指向该队列,第一个消费者将从 0 开始处理对象。第二个消费者将在随机偏移后开始处理对象(比如从 700 开始。)

一旦,第一个消费者已经完成了从 0 - 700 的处理对象,并且第二个消费者已经处理了 200 条记录(700 - 900),第一个消费者可以开始从任何随机偏移量(可能是 1200 )获取对象。

偏移量的调整由 ActiveMQ 自动控制。

我观察到了这一点。我非常确定会发生这种情况。

希望我已经回答了您的问题(或至少正确理解了您的问题。)。

我在这里不明白的是,如果 Active-MQ 创建 QUEUES,它是如何从介于两者之间的某个地方提供对象的?

于 2013-01-04T18:39:14.387 回答
1

很遥远,因为我不太确定,但是在您的配置中,您排除了所有主题

<excludedDestinations>
    <topic physicalName=">"/> 
</excludedDestinations>

您能否取消该限制以进行测试。当客户端连接到特定队列/主题时,Activemq 使用咨询主题进行通信。因此,您的第三个经纪人可能不知道其他客户,因为您阻止了咨询主题。

于 2012-12-29T13:15:05.687 回答