1

我有多个将文件发送到服务器的客户端。对于一组数据,有两个文件包含有关该数据的信息,每个文件都具有相同的名称。收到文件后,服​​务器会向我的队列发送一条消息,其中包含文件路径、文件名、客户端 ID 和文件的“类型”(都具有相同的文件扩展名,但有两种“类型, " 称他们为 A 和 B)。

一组数据的两个文件具有相同的文件名。一旦服务器收到这两个文件,我就需要启动一个将两者结合起来的程序。目前我有一些看起来像这样的东西:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");

我被卡住的地方是标题(“CamelFileName”),更具体地说,聚合器是如何工作的。

将 completionSize 设置为 2 时,它是否只是吸收所有消息并将它们存储在某个数据结构中,直到与第一个匹配的第二个消息通过?另外, header() 是否需要一个特定的值?我有多个客户端,所以我想在标题中包含客户端 ID 和文件名,但是我不知道是否必须给出特定值。我也不知道我是否可以使用正则表达式。

任何想法或提示都会非常有帮助。谢谢

编辑:这是我现在拥有的一些代码。根据我在此处对问题的描述以及对所选答案的评论,它看起来是否准确(除了我没有复制的右括号)?

public static void main(String args[]) throws Exception{
        CamelContext c = new DefaultCamelContext();
        c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
        //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
        //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        c.addRoutes(new RouteBuilder() {
            public void configure() {
                from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
            }
        });
        c.start();
        while (true) {
            System.out.println("Waiting on messages to come through for camel");
            Thread.sleep(2 * 1000);
        }
        //c.stop();
    }

    private static class MyAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null)
                return newExchange;
            // and here is where combo stuff goes
            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newExchange.getIn().getBody(String.class);
            boolean oldSet = oldBody.contains("set");
            boolean newSet = newBody.contains("set");
            boolean oldFlow = oldBody.contains("flow");
            boolean newFlow = newBody.contains("flow");
            if ( (oldSet && newFlow) || (oldFlow && newSet) ) {
                //they match so return new exchange with info so extractor can be started with exec
                String combined = oldBody + "\n" + newBody + "\n";
                newExchange.getIn().setBody(combined);
                return newExchange;
            }
            else {
                // no match so do something....
                return null;
            }
        }
    }
4

1 回答 1

3

你必须提供一个 AggregationStrategy 来定义你想如何组合交换......

如果您只对文件名感兴趣并且正好接收 2 个交换,那么您可以使用 UseLatestAggregationStrategy 来传递最新的交换,一旦 2 个被“聚合”...

也就是说,听起来您需要保留两个 Exchange(每个 clientId 一个),以便您可以将该信息传递给“执行”步骤......如果是这样,您可以使用内置的将 Exchange 组合到 GroupedExchange 持有者中-in 通过groupExchanges选项启用聚合策略...或指定自定义 AggregationStrategy 以根据需要组合它们。只需要记住,您的“执行”步骤需要处理您决定使用的任何聚合结构......

有关示例,请参见这些单元测试:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

于 2013-06-21T17:56:04.017 回答