1

我正在尝试实施提议的 SCS 聚合,但我不确定了解它们的真正目的,因为结果让我感到惊讶。首先,这是代码...

源,要安排的消息提供程序:

@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {

    private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);

    @Bean
    @InboundChannelAdapter(Source.OUTPUT)
    public MessageSource<String> createMessage() {
        return () -> {
            String payload = now().toString();
            logger.warn("Sent: " + payload);
            return new GenericMessage<>(payload);
        };
    }
}

然后是一个简单的处理器(变压器):

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {

    @Transformer(inputChannel = Processor.INPUT,
                 outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        return payload + " is the time.";
    }
}

这是最终消费者(接收器):

@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

    private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void loggerSink(Object payload) {
        logger.warn("Received: " + payload);
    }
}

最后,聚合将这三个链接起来:

@SpringBootApplication
public class SampleAggregateApplication {

    public static void main(String[] args) {
        new AggregateApplicationBuilder().web(false)
            .from(SourceApplication.class)
                .args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
            .via(ProcessorApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step1",
                      "--spring.cloud.stream.bindings.output.destination=step2")
            .to(SinkApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step2")
        .run(args);
    }
}

当聚合启动时,这些是检索到的跟踪的摘录:

2017-02-03 09:59:13.428  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:20.018
2017

我原以为每条消息都会遵循定义的周期:源-处理器-接收器。我们可以看到至少有 3 条消息中的 2 条丢失了,并且只有 4 条消息中的 1 条被转换了。注意:通道目标是在第二次尝试中添加的,以避免应用程序之间的假定混淆(使用相同的 RabbitMQ 中间件)。

有人能告诉我我是否正确地理解了聚合目的并做了正确的事情来实现它吗?提前致谢。

4

1 回答 1

1

有几件事:

  • 您不应该为您的应用程序指定目的地,因为作为聚合一部分的应用程序通过内部渠道进行通信,而不是通过代理;
  • 其次(不幸的是,这是我们的文档没有指定的内容) - 当@SpringBootApplication在每个聚合组件定义上使用时,聚合的不同部分必须属于不同的 Java 包。在您的情况下发生的情况是,您在各种渠道上获得了多个消费者,而不是您期望的链接。将 Source、Transformer 和 Sink 移动到单独的包中应该适合您。此外,添加了https://github.com/spring-cloud/spring-cloud-stream/issues/785来跟踪这一点。
于 2017-02-03T14:52:30.160 回答