1

我是 Spring Integration 和 EIP 的新手。

我试图解决自己,但它不会在我的脑海中出现正确的解决方案。

这是我需要的流程示例。

  1. 客户端向 SI 发送 Json 数据。
  2. HTTP 入站网关将 Json 数据作为有效负载。
  3. 此 Json 数据将用于两个单独的 HTTP 出站。但是第二个出站呼叫是在第一个呼叫出站呼叫的响应之后进行的。
  4. 将结果回复到第 0 步(HTTP 入站网关上的回复通道)

我正在考虑使用 publishSubscribeChannel 或 RecipientListRouter 将相同的消息发送到单独的调用。但在这种情况下,它需要在特定时间执行第二个 http outbound。据我了解,这应该是第 4 步回复的一项交易。

或者...

是否可以在第一次调用后的第二次调用中保留消息实例(来自 http 入站网关的 JSON 数据)。

请给我一些想法,如果您有示例代码,那一定很棒。目前我正在编写 JAVA 配置样式,但也欢迎 XML 示例。

更新问题

我正在尝试添加网桥和 executorChannel。

这是我的代码,几乎没有按我预期的那样工作。

  1. 我想第二个出站处理程序(httpPMGateway)使用来自 aIncomingGateway 的消息(因此使用 publishSubscribeChannel),但是在 httpJ3Gateway 出站处理程序之后,它使用来自 httpJ3Gateway 的消息。我将路由与来自 httpJ3Gateway 的响应消息一起使用。(好的-> httpPMgateway,默认-> backAChannel)

  2. 我想从 j3ValidationFlow 或广播流的末尾向 aIncomingGateway 发送响应消息。但是 aIncomingGateway 在发送之前就已经收到了回复消息。这是日志消息。

2017-05-16 11:17:09.061 WARN 11488 --- [pool-1-thread-1] cMessagingTemplate$TemporaryReplyChannel :收到回复消息,但接收线程已经收到回复:

这是代码部分:

@Bean
public MessagingGatewaySupport aIncomingGateway() {
    HttpRequestHandlingMessagingGateway handler = new HttpRequestHandlingMessagingGateway(true);

    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setMethods(HttpMethod.POST);
    requestMapping.setPathPatterns("/a/incoming");
    requestMapping.setConsumes("application/json");
    requestMapping.setProduces("application/json");


    handler.setRequestMapping(requestMapping);
    handler.setMessageConverters(getMessageConverters());
    handler.setRequestPayloadType(Amount.class);

    handler.setReplyChannelName("backAChannel");
    return handler; 
}

@Bean
public MessageHandler httpJ3Gateway(){
    HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler("http://localhost:8080/j3/incoming");
    httpHandler.setHttpMethod(HttpMethod.POST);
    httpHandler.setExpectReply(true);
    httpHandler.setMessageConverters(getMessageConverters());
    httpHandler.setExpectedResponseType(String.class);
    httpHandler.setRequestFactory(requestFactory());
    return httpHandler;
}

@Bean
public MessageHandler httpPMGateway(){
    HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler("http://localhost:8080/pm/incoming");
    httpHandler.setHttpMethod(HttpMethod.POST);
    httpHandler.setExpectReply(true);
    httpHandler.setMessageConverters(getMessageConverters());
    httpHandler.setExpectedResponseType(String.class);
    return httpHandler;
}

@Bean
public MessageChannel broadChannel(){
    return new ExecutorChannel(Executors.newCachedThreadPool());
}

@Bean
@ServiceActivator(inputChannel = "brigdeChannel")
public MessageHandler bridgeHandler(){
    BridgeHandler handler = new BridgeHandler();
    handler.setOutputChannel(broadChannel());
    return handler;
}


@Bean
public IntegrationFlow aIncomingFlow() {
    return IntegrationFlows.from(aIncomingGateway())
            .log(LoggingHandler.Level.INFO, "From Inbound http gateway", m -> m.getPayload().toString())
            .publishSubscribeChannel(p -> p
                    .subscribe(s -> s.channel("j3Channel"))
                    .subscribe(s -> s.bridge(b -> bridgeHandler()) )
                    )
            .get();
}

@Bean
public IntegrationFlow j3ValidationFlow() {
    return IntegrationFlows.from("j3Channel")
            .log(LoggingHandler.Level.INFO, "Run j3ChannelFlow", m -> m.getPayload().toString())
            .handle(httpJ3Gateway())
            .route("headers.http_statusCode", m -> m
                    .resolutionRequired(false)
                    .channelMapping("OK", "brigdeChannel")
                    .defaultOutputToParentFlow()
                    )
            .channel("backAChannel")
            .get();
}

@Bean
public IntegrationFlow broadcastFlow() {
    return IntegrationFlows.from("broadChannel")
            .log(LoggingHandler.Level.INFO, "Run broadcastFlow", m -> m.getPayload().toString())
            .handle(httpPMGateway())
            .channel("backAChannel")
            .get();
}

请帮我想出一个主意。
我提前表示感谢。谢谢你在这个愚蠢的问题上帮助我。

4

1 回答 1

0

是的; 使用发布/订阅频道或收件人列表路由器

inbound -> ... -> pubsub

pubsub -> outbound1

pubsub -> bridge -> executorChannel -> outbound2

outbound1 将在入站线程上运行;outbound2 将在另一个线程上运行。

于 2017-05-15T18:35:59.560 回答