我目前正在尝试将一些遗留代码/系统迁移到 Camel。当前系统正在处理队列的大量文件夹,并按顺序处理这些队列中的所有消息,而不是在开始处理下一条消息之前完全处理一条消息的完整流。我们想摆脱这个,这似乎可以用 Camel (由于以前使用 Camel 的经验,我已经是新手了)。只有一个“问题”,我似乎无法获得简单的流程定义。在简化形式的文本中,流程是这样的
- 轮询某个文件夹的文件
- 有些文件需要聚合有些可以直接发送(目前基于文件名,但将来可能基于内容)
- 根据内容中的某个字段聚合文件
- 将文件发送到远程系统
这在快乐的日子场景中效果很好。下一步是添加错误处理。为此,我们有一个重要的要求。输入文件夹中的单个文件可能已被聚合,但在重新发送后未发送出去,因为单个文件最终会出现在错误文件夹中。
这就是问题所在,至少在保持流程简单方面。我提到“至少保持流程简单”的原因是,通过(很多)额外的节点和步骤,我可以实现我想要的(不会发布这个例子,因为这不是我想要的“建议” )。我正在使用的流程是这样的:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring https://camel.apache.org/schema/spring/camel-spring-3.4.0.xsd">
<bean class="org.apache.camel.processor.aggregate.StringAggregationStrategy" id="myAggregationStrategy">
<camelContext>
<route>
<from uri="file://c:/data/store/merge/?moveFailed=.error"/>
<setHeader headerName="MyGroupingID" id="_setHeader1">
<simple resultType="int">${header.CamelFileName.split('_|\.')[0]}</simple>
</setHeader>
<aggregate strategyRef="myAggregationStrategy">
<correlationExpression>
<header>MyGroupingID</header>
</correlationExpression>
<completionTimeout>
<constant>5000</constant>
</completionTimeout>
<choice id="_choice1">
<when id="_when1">
<simple>${exchange.properties['CamelAggregatedCorrelationKey']} == 1</simple>
<throwException exceptionType="java.lang.Exception" id="_throwException1" message="Group 1 has an error"/>
</when>
<otherwise id="_otherwise1">
<log id="processMessage" message="Processed exchange: [${exchange}] body: [${body}] headers: [${headers}]."/>
</otherwise>
</choice>
</aggregate>
</route>
</contextx>
</beans>
使用聚合器时发生的情况是,一旦单个消息进入聚合器节点并被处理,它就会从输入文件夹移动到默认.camel
文件夹(或您指定的任何文件夹)。所以我打开了跟踪级别的日志并开始调查为什么会发生这种情况。实际移动是由GenericFileOnCompletion
which 完成的,而 which 似乎Synchronization
在 UnitOfWork 中被注册为 a 并在与 which 没有任何关系时触发,Exchange
因为AggregateProcessor
似乎制作了交换的副本。
我试过的:
- 将自定义 onCompletion 添加到路由(或全局),但它仅针对原始交换触发
- 添加一个错误处理程序,但它只为聚合的 Exchange 触发
我进行了更多调试,并注意到使用基于我的自定义聚合策略, GroupedExchangeAggregationStrategy
我可以拥有一个新的“聚合”主体,并且单个交换作为List<Exchange
聚合交换中的一个属性。我(我想我)可以在 errorHandler 中使用原始交换的副本列表来解析这些交换中的每一个,并使用对原始位置的引用并将文件从.camel
文件夹移动到文件.error
夹(两个属性都可以从文件中检索端点(所有这些都在一个处理器中,因此我们的流程开发人员隐藏了这个逻辑)
我知道的其他事情:
- 我可以创建自己的处理器来完成聚合器所做的事情,但我不确定我是否可以让原始交换保持“活跃”?
- 我可能需要一些“持久性”(例如用于聚合的 leveldb)才能在系统崩溃中幸存下来
单元测试:能够重现事物
@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration({"/META-INF/spring/applicationContext.xml"})
@DisableJmx(true)
@UseAdviceWith(true)
public class MessageGroupingTest {
private static final Logger log = LoggerFactory.getLogger(MessageGroupingTest.class);
@Autowired
private CamelContext camelContext;
private final AtomicBoolean adviced = new AtomicBoolean(false);
@Produce(uri = "file://c:/data/store/merge/")
private ProducerTemplate producer;
@EndpointInject(uri = "mock:semiStreamingGrouping")
private MockEndpoint mock;
@Before
public void adviceGroupingRoute() throws Exception {
if (!adviced.get()) {
ModelCamelContext mcc = camelContext.adapt(ModelCamelContext.class);
RouteReifier.adviceWith(mcc.getRouteDefinition("_route1"), mcc, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
weaveById("processMessage")
.after()
.to(mock);
}
});
camelContext.start();
adviced.set(true);
}
}
@Test
public void testGroupingMessages() throws Exception {
String message = "Hello world!";
Map<String, Object> headers = new HashMap<>();
headers.put("MyGroupingID", 1);
headers.put("CamelFileName", "1_2_1.txt");
producer.sendBodyAndHeaders(message, headers);
headers.put("CamelFileName", "1_2_2.txt");
producer.sendBodyAndHeaders(message, headers);
headers.put("CamelFileName", "2_1_1.txt");
producer.sendBodyAndHeaders(message, headers);
mock.expectedMessageCount(1);
Thread.sleep(5000);
mock.assertIsSatisfied(300000L);
}
}
使用 Camel 3.6.0(但也尝试了 2.25.2 以及所有相关(后退)的更改,同样的问题,所以它不是某种回归 ;-)
我不是在寻找代码中的完整解决方案,也会找到有关可能性或不可能性(无法解决的问题)的提示(尽管如果有简单的解决方案,我不介意被提及或将其作为答案;-))
我读过的间接相关的帖子: