6

我想知道 Camel 是否可以根据交换的内容进行节流。

情况如下:我必须通过肥皂调用网络服务。其中,发送给那个webservice的参数有一个customerId。问题是,如果给定的 customerId 每分钟有超过 1 个请求,则 Web 服务会发回错误。

我想知道是否可以使用 Camel 实现每个 customerId 的节流。因此,不应该对所有消息实施限制,而应仅对具有相同 customerId 的消息实施限制。

让我知道如何实现这一点,或者我是否需要澄清我的问题。

4

3 回答 3

2

ActiveMQ 消息组旨在处理这种情况。因此,如果您可以在路由中引入 JMS 队列跃点,那么只需将 JMSXGroupId 标头设置为 customerId。然后在另一条路线中,您可以从此队列中消费并发送到您的 Web 服务以获取您描述的行为。

另请参阅http://camel.apache.org/parallel-processing-and-ordering.html了解更多信息......

于 2012-09-11T15:31:19.630 回答
2

虽然 ActiveMQ 消息组肯定会解决唯一客户 ID 的并行处理,但在我的评估中,克劳斯是正确的,为每个唯一组引入节流阀代表了 Camel/ActiveMQ 未实现的功能。

单独的消息组将不符合所描述的 SLA。虽然每组消息(与客户 ID 相关)将按顺序处理,每组一个线程,只要请求不到一分钟即可收到响应,则不会强制执行每个客户每分钟一个请求的要求.

也就是说,我很想知道是否可以将消息组和节流策略结合起来,以模拟 JIRA 中的功能请求。到目前为止,我的尝试都失败了。我在想一些事情:

<route>
  <from uri="activemq:pending?maxConcurrentConsumers=10"/>
  <throttle timePeriodMillis="60000">
    <constant>1</constant>
    <to uri="mock:endpoint"/>
  </throttle>
</route>

但是,限制似乎适用于移动到端点的整个请求集,而不是适用于每个单独的消费者。我不得不承认,我有点惊讶地发现这种行为。我的期望是限制将单独应用于每个消费者,这将满足原始问题中的 SLA,前提是消息在 JMSXGroupId 标头中包含客户 ID。

于 2012-12-21T15:42:08.513 回答
2

我遇到了类似的问题,最后想出了这里描述的解决方案。

我的假设是:

  • 消息的顺序并不重要(尽管它可以通过重新排序器来解决)
  • 每个客户 ID 的消息总量不是很大,因此运行时没有饱和。

解决办法:

  • 运行 aggregator 1 分钟,同时使用 customerID 将具有相同客户 ID 的消息组装到列表中
  • 使用 Splitter 将列表拆分为单独的消息
  • 将第一条消息从拆分器发送到实际服务
  • 将列表的其余部分重新路由回聚合器。

Java DSL 版本更容易理解:

final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
        .accumulateInCollection(ArrayList.class);

from("direct:start")
    .log("Receiving ${body}")
    .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
        .log("Aggregate: releasing ${body}")
        .split(body())
        .choice()
            .when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
                .log("*** Processing: ${body}")
                .to("mock:result")
            .otherwise()
              .to("seda:delay")
        .endChoice();

from("seda:delay")
    .delay(0)
    .to("direct:start");

Spring XML 版本如下所示:

 <!-- this is our aggregation strategy defined as a spring bean -->
 <!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
 <bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
 <bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
     <constructor-arg value="java.util.ArrayList" />
 </bean>

<camelContext xmlns="http://camel.apache.org/schema/spring">
       <route>
           <from uri="direct:start"/>
           <log message="Receiving ${body}"/>
           <aggregate strategyRef="_flexible2" completionTimeout="60000" >
               <correlationExpression>
                   <xpath>/order/@customerID</xpath>
               </correlationExpression>
               <log message="Aggregate: releasing ${body}"/>
               <split>
                   <simple>${body}</simple>
                   <choice>
                       <when>
                           <simple>${header.CamelSplitIndex} == 0</simple>
                           <log message="*** Processing: ${body}"/>
                           <to uri="mock:result"/>
                       </when>
                       <otherwise>
                           <log message="--- Delaying: ${body}"/>
                           <to uri="seda:delay" />
                       </otherwise>
                   </choice>
               </split>
           </aggregate>
       </route>

       <route>
           <from uri="seda:delay"/>
           <to uri="direct:start"/>
       </route>
</camelContext>
于 2016-12-21T02:21:25.757 回答