我想知道 Camel 是否可以根据交换的内容进行节流。
情况如下:我必须通过肥皂调用网络服务。其中,发送给那个webservice的参数有一个customerId。问题是,如果给定的 customerId 每分钟有超过 1 个请求,则 Web 服务会发回错误。
我想知道是否可以使用 Camel 实现每个 customerId 的节流。因此,不应该对所有消息实施限制,而应仅对具有相同 customerId 的消息实施限制。
让我知道如何实现这一点,或者我是否需要澄清我的问题。
我想知道 Camel 是否可以根据交换的内容进行节流。
情况如下:我必须通过肥皂调用网络服务。其中,发送给那个webservice的参数有一个customerId。问题是,如果给定的 customerId 每分钟有超过 1 个请求,则 Web 服务会发回错误。
我想知道是否可以使用 Camel 实现每个 customerId 的节流。因此,不应该对所有消息实施限制,而应仅对具有相同 customerId 的消息实施限制。
让我知道如何实现这一点,或者我是否需要澄清我的问题。
ActiveMQ 消息组旨在处理这种情况。因此,如果您可以在路由中引入 JMS 队列跃点,那么只需将 JMSXGroupId 标头设置为 customerId。然后在另一条路线中,您可以从此队列中消费并发送到您的 Web 服务以获取您描述的行为。
另请参阅http://camel.apache.org/parallel-processing-and-ordering.html了解更多信息......
虽然 ActiveMQ 消息组肯定会解决唯一客户 ID 的并行处理,但在我的评估中,克劳斯是正确的,为每个唯一组引入节流阀代表了 Camel/ActiveMQ 未实现的功能。
单独的消息组将不符合所描述的 SLA。虽然每组消息(与客户 ID 相关)将按顺序处理,每组一个线程,只要请求不到一分钟即可收到响应,则不会强制执行每个客户每分钟一个请求的要求.
也就是说,我很想知道是否可以将消息组和节流策略结合起来,以模拟 JIRA 中的功能请求。到目前为止,我的尝试都失败了。我在想一些事情:
<route>
<from uri="activemq:pending?maxConcurrentConsumers=10"/>
<throttle timePeriodMillis="60000">
<constant>1</constant>
<to uri="mock:endpoint"/>
</throttle>
</route>
但是,限制似乎适用于移动到端点的整个请求集,而不是适用于每个单独的消费者。我不得不承认,我有点惊讶地发现这种行为。我的期望是限制将单独应用于每个消费者,这将满足原始问题中的 SLA,前提是消息在 JMSXGroupId 标头中包含客户 ID。
我遇到了类似的问题,最后想出了这里描述的解决方案。
我的假设是:
解决办法:
Java DSL 版本更容易理解:
final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
.accumulateInCollection(ArrayList.class);
from("direct:start")
.log("Receiving ${body}")
.aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
.log("Aggregate: releasing ${body}")
.split(body())
.choice()
.when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
.log("*** Processing: ${body}")
.to("mock:result")
.otherwise()
.to("seda:delay")
.endChoice();
from("seda:delay")
.delay(0)
.to("direct:start");
Spring XML 版本如下所示:
<!-- this is our aggregation strategy defined as a spring bean -->
<!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
<bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
<bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
<constructor-arg value="java.util.ArrayList" />
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<log message="Receiving ${body}"/>
<aggregate strategyRef="_flexible2" completionTimeout="60000" >
<correlationExpression>
<xpath>/order/@customerID</xpath>
</correlationExpression>
<log message="Aggregate: releasing ${body}"/>
<split>
<simple>${body}</simple>
<choice>
<when>
<simple>${header.CamelSplitIndex} == 0</simple>
<log message="*** Processing: ${body}"/>
<to uri="mock:result"/>
</when>
<otherwise>
<log message="--- Delaying: ${body}"/>
<to uri="seda:delay" />
</otherwise>
</choice>
</split>
</aggregate>
</route>
<route>
<from uri="seda:delay"/>
<to uri="direct:start"/>
</route>
</camelContext>