1

我正在构建一个系统,它将通过消息代理(当前为 JMS)接收来自不同系统的消息。来自所有发送者系统的所有消息都有一个 deviceId,并且消息的接收没有顺序。例如,系统 A 可以发送 deviceId=1 的消息,系统 b 可以发送 deviceId=2 的消息。

我的目标是不开始处理有关同一 deviceId 的消息,除非我从所有具有相同 deviceId 的发件人那里收到所有消息。

例如,如果我有 3 个系统 A、B 和 C 向我的系统发送消息:

System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.

这个问题是否应该通过在我的系统中使用一些同步机制、消息代理或像 spring-integration/apache camel 这样的集成框架来解决?

4

3 回答 3

2

与 Aggregator 类似的解决方案(@Artem Bilan 提到的)也可以在 Camel 中通过自定义AggregationStrategy和使用Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP属性控制 Aggregator 完成来实现。

以下可能是一个很好的起点。(您可以在此处找到带有测试的示例项目

路线:

from("direct:start")
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
    .log(LoggingLevel.INFO, "Signaled body: ${body}")
    .to("direct:result");

SignalAggregationStrategy.java

public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {

    private int numberOfSystems;

    public SignalAggregationStrategy(int numberOfSystems) {
        this.numberOfSystems = numberOfSystems;
    }

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Exchange exchange = super.aggregate(oldExchange, newExchange);

        List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);

        // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
        // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
        if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
            exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }

        return exchange;
    }

    @Override
    public boolean matches(Exchange exchange) {
        // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
        return false;
    }
}

希望能帮助到你!

于 2017-09-18T19:34:29.460 回答
1

您可以使用缓存组件在 Apache Camel 中执行此操作。我认为有 EHCache 组件。

本质上:

  1. 您会收到一条带有给定 deviceId 的消息,比如 deviceId1。
  2. 您在缓存中查找已收到 deviceId1 的哪些消息。
  3. 只要您没有收到所有三个,您就可以将当前系统/消息添加到缓存中。
  4. 一旦所有消息都在那里,您就可以处理并清除缓存。

然后,您当然可以将每条传入消息路由到基于特定 deviceId 的队列以进行临时存储。这可以是 JMS、ActiveMQ 或类似的东西。

于 2017-09-16T14:53:55.140 回答
1

Spring Integration 为此类任务提供了组件 - 在收集整个组之前不要发出。它的名字叫Aggregator。你deviceId的绝对是一个correlationKey。这releaseStrategy实际上可能取决于系统的数量 -deviceId1在继续下一步之前您正在等待多少消息。

于 2017-09-18T14:22:17.417 回答