我对路由的理解(在 Apache Camel 术语中)是它表示从一个端点到另一个端点的数据流,并且它会在对数据执行 EIP 类型操作的过程中停止在各个处理器处。
如果这是对路线的正确/公平评估,那么我正在建模一个我认为需要在同一条路线内有几条路线的问题CamelContext
(我使用的是 Spring):
- 路线 1:从 Source-1 中提取数据,对其进行处理,将其转换为 a
List<SomePOJO>
,然后将其发送到聚合器 - 路线 2:从 Source-2 中提取数据,对其进行处理,还将其转换为 a
List<SomePOJO>
,然后将其发送到聚合器 - Route 3:包含一个聚合器,等待它
List<SomePOJO>
从Route 1 和 Route 2 接收到 a ,然后继续处理聚合列表
事情是这样的:两个List<SomePOJO>
s 需要同时到达聚合器,或者更确切地说,聚合器 bean 必须等到它从两个路由接收到数据之前,它才能将 2 个列表聚合为一个List<SomePOJO>
列表并将聚合列表发送到3号公路的其余部分。
到目前为止,我有以下伪编码<camelContext>
:
<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
<!-- Route 1 -->
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor1?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 2 -->
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor2?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 3 -->
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy">
<correlationExpression>
<!-- Haven't figured this part out yet. -->
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>
</camelContext>
<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />
然后在Java中:
public class ListAggregatorStrategy implements AggregatoryStrategy {
public Exchange aggregate(Exchange exchange) {
List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);
List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
aggregateList.addAll(route2POJOs);
return aggregateList;
}
}
我的问题
- 我的基本设置正确吗?换句话说,我
direct:aggregator
是否正确使用端点将数据从聚合器发送出去route-1
和发送route-2
到route-3
聚合器? - 我的聚合器会按照我期望的方式工作吗?假设
extractor1
bean inroute-1
只需要 5 秒运行,但extractor2
bean inroute-2
需要 2 分钟运行。在 t=5 时,聚合器应该从那里接收数据extractor1
并开始等待(2 分钟)直到extractor2
完成,然后将其余数据提供给聚合器。是的?