1

我目前正在尝试将一些遗留代码/系统迁移到 Camel。当前系统正在处理队列的大量文件夹,并按顺序处理这些队列中的所有消息,而不是在开始处理下一条消息之前完全处理一条消息的完整流。我们想摆脱这个,这似乎可以用 Camel (由于以前使用 Camel 的经验,我已经是新手了)。只有一个“问题”,我似乎无法获得简单的流程定义。在简化形式的文本中,流程是这样的

  • 轮询某个文件夹的文件
  • 有些文件需要聚合有些可以直接发送(目前基于文件名,但将来可能基于内容)
  • 根据内容中的某个字段聚合文件
  • 将文件发送到远程系统

这在快乐的日子场景中效果很好。下一步是添加错误处理。为此,我们有一个重要的要求。输入文件夹中的单个文件可能已被聚合,但在重新发送后未发送出去,因为单个文件最终会出现在错误文件夹中。

这就是问题所在,至少在保持流程简单方面。我提到“至少保持流程简单”的原因是,通过(很多)额外的节点和步骤,我可以实现我想要的(不会发布这个例子,因为这不是我想要的“建议” )。我正在使用的流程是这样的:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:camel="http://camel.apache.org/schema/spring"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring https://camel.apache.org/schema/spring/camel-spring-3.4.0.xsd">

    <bean class="org.apache.camel.processor.aggregate.StringAggregationStrategy" id="myAggregationStrategy">
    
    <camelContext>
        <route>
            <from uri="file://c:/data/store/merge/?moveFailed=.error"/>
                           
            <setHeader headerName="MyGroupingID" id="_setHeader1">
                <simple resultType="int">${header.CamelFileName.split('_|\.')[0]}</simple>
            </setHeader>
            <aggregate  strategyRef="myAggregationStrategy">
                <correlationExpression>
                    <header>MyGroupingID</header>
                </correlationExpression>
                <completionTimeout>
                    <constant>5000</constant>
                </completionTimeout>
                <choice id="_choice1">
                    <when id="_when1">
                        <simple>${exchange.properties['CamelAggregatedCorrelationKey']} == 1</simple>
                        <throwException exceptionType="java.lang.Exception" id="_throwException1" message="Group 1 has an error"/>
                    </when>
                    <otherwise id="_otherwise1">
                        <log id="processMessage" message="Processed exchange: [${exchange}] body: [${body}] headers: [${headers}]."/>
                    </otherwise>
                </choice>
            </aggregate>
        </route>
    </contextx>
</beans>

使用聚合器时发生的情况是,一旦单个消息进入聚合器节点并被处理,它就会从输入文件夹移动到默认.camel文件夹(或您指定的任何文件夹)。所以我打开了跟踪级别的日志并开始调查为什么会发生这种情况。实际移动是由GenericFileOnCompletionwhich 完成的,而 which 似乎Synchronization在 UnitOfWork 中被注册为 a 并在与 which 没有任何关系时触发,Exchange因为AggregateProcessor似乎制作了交换的副本。

我试过的:

  • 将自定义 onCompletion 添加到路由(或全局),但它仅针对原始交换触发
  • 添加一个错误处理程序,但它只为聚合的 Exchange 触发

我进行了更多调试,并注意到使用基于我的自定义聚合策略, GroupedExchangeAggregationStrategy我可以拥有一个新的“聚合”主体,并且单个交换作为List<Exchange聚合交换中的一个属性。我(我想我)可以在 errorHandler 中使用原始交换的副本列表来解析这些交换中的每一个,并使用对原始位置的引用并将文件从.camel文件夹移动到文件.error夹(两个属性都可以从文件中检索端点(所有这些都在一个处理器中,因此我们的流程开发人员隐藏了这个逻辑)

我知道的其他事情:

  • 我可以创建自己的处理器来完成聚合器所做的事情,但我不确定我是否可以让原始交换保持“活跃”?
  • 我可能需要一些“持久性”(例如用于聚合的 leveldb)才能在系统崩溃中幸存下来

单元测试:能够重现事物

@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration({"/META-INF/spring/applicationContext.xml"})
@DisableJmx(true)
@UseAdviceWith(true)
public class MessageGroupingTest {

  private static final Logger log = LoggerFactory.getLogger(MessageGroupingTest.class);

  @Autowired
  private CamelContext camelContext;
  
  private final AtomicBoolean adviced = new AtomicBoolean(false);

  @Produce(uri = "file://c:/data/store/merge/")
  private ProducerTemplate producer;

  @EndpointInject(uri = "mock:semiStreamingGrouping")
  private MockEndpoint mock;

  @Before
  public void adviceGroupingRoute() throws Exception {
    if (!adviced.get()) {
        ModelCamelContext mcc = camelContext.adapt(ModelCamelContext.class);
        RouteReifier.adviceWith(mcc.getRouteDefinition("_route1"), mcc, new AdviceWithRouteBuilder() {
          @Override
        public void configure() throws Exception {
            weaveById("processMessage")
            .after()
            .to(mock);
            
        }
        });
      camelContext.start();
      adviced.set(true);
    }
  }
  
  @Test
  public void testGroupingMessages() throws Exception {
    String message = "Hello world!";
    
      Map<String, Object> headers = new HashMap<>();

      headers.put("MyGroupingID", 1);
  
      headers.put("CamelFileName", "1_2_1.txt");
      producer.sendBodyAndHeaders(message, headers);
      
      headers.put("CamelFileName", "1_2_2.txt");
      producer.sendBodyAndHeaders(message, headers);

      headers.put("CamelFileName", "2_1_1.txt");
      producer.sendBodyAndHeaders(message, headers);


    mock.expectedMessageCount(1);
    Thread.sleep(5000);
    mock.assertIsSatisfied(300000L);
  }
}

使用 Camel 3.6.0(但也尝试了 2.25.2 以及所有相关(后退)的更改,同样的问题,所以它不是某种回归 ;-)

我不是在寻找代码中的完整解决方案,也会找到有关可能性或不可能性(无法解决的问题)的提示(尽管如果有简单的解决方案,我不介意被提及或将其作为答案;-))

我读过的间接相关的帖子:

4

0 回答 0