3

我有以下服务器配置:

@Configuration
@EnableIntegration
public class Config {

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class))
                .enrich(enricherSpec -> {
                    enricherSpec.header("correlationId", 1); //ackCorrelationId
                })
                .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
                .log()
                //.barrier(1000L)
                .log()
                .handle(Amqp.outboundAdapter(amqpTemplate())
                        .exchangeName("barrierExchange")
                        .routingKey("barrierKey")
                        .confirmAckChannel(confirmAckChannel())
                        .confirmCorrelationExpression("payload")
                )
                .get();
    }


    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost("localhost");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setPublisherConfirms(true);
        cachingConnectionFactory.setPublisherReturns(true);
        return cachingConnectionFactory;
    }

    @Bean
    public AmqpTemplate amqpTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public DirectChannel confirmAckChannel() {
        return new DirectChannel();
    }

    @Bean

    public IntegrationFlow ackChannelListener() {
        return IntegrationFlows.from(confirmAckChannel())
                .handle(m -> {
                    System.out.println("ACK:" + m);
                })
                .get();
    }

}

和以下客户端配置:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(consoleSource(), consoleConsumer())
                .handle(httpOutboundGateway())
                .log()
                .channel("httpRequestChannel")
                .handle(s -> {
                    System.out.println("We got response: " + s);
                })
                .get();
    }

    private HttpMessageHandlerSpec httpOutboundGateway() {
        return Http.outboundGateway("http://localhost:8080/spring_integration_post") //http://localhost:8080/my_post
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class);
    }

    private Consumer<SourcePollingChannelAdapterSpec> consoleConsumer() {
        return c -> c.poller(Pollers.fixedRate(1000)
                .maxMessagesPerPoll(1));
    }

    public MessageSource<String> consoleSource() {
        return CharacterStreamReadingMessageSource.stdin();
    }
}

从我发送的客户a,v,b

我看到服务器接受了该消息,将 3 条消息发送到 rabbitMq(通过 rabbit admin 我看到消息确实被 rabbit 接受了)并得到 3 个确认:

ACK:GenericMessage [payload=a, headers={amqp_publishConfirm=true, id=eb8fd94b-5721-8b3b-5219-b7a4e0d950a8, timestamp=1567062603387}]
ACK:GenericMessage [payload=v, headers={amqp_publishConfirm=true, id=724b7def-1ab2-79b9-c788-27f6cbe24a33, timestamp=1567062603388}]
ACK:GenericMessage [payload=b, headers={amqp_publishConfirm=true, id=12a10799-d664-64dc-dd9e-1601fda61977, timestamp=1567062603389}]

客户端打印以下跟踪:

2019-08-29 10:10:04.392 ERROR 18404 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=a,v,b, headers={id=0b902e6f-d4de-329f-9916-19f94cdedc4e, timestamp=1567062603152}]
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
    at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
    at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
    at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
    ... 30 more

为什么会发生?

我该如何解决?

附言

正如@Artem Bilan 所说,我的问题是我没有对客户做出任何回应,因此客户遇到了超时错误。我想说这个错误真的很令人困惑。我希望 504 错误超时。

在所有消息都发送给rabbit之后,我试图向客户端返回一些东西,所以我编写了以下配置:

@Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class))
            .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
            .log()
            .handle(Amqp.outboundAdapter(amqpTemplate())
                    .exchangeName("barrierExchange")
                    .routingKey("barrierKey")
                    .confirmAckChannel(confirmAckChannel())
                    .confirmCorrelationFunction(Message::getPayload)
            ).handle((payload, headers) -> {
                System.out.println("Before aggregation");
                return true;
            })
            .aggregate()
            .handle((payload, headers) -> {
                System.out.println("After aggregation");
                return true;
            }).get();

但是从客户端我看到了相同的堆栈跟踪。另外从服务器端我看到了那个痕迹:

2019-08-30 12:19:10.166  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.167  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=A, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=bb52003e-0731-d64c-1285-e28adb93c87a, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750166}]
2019-08-30 12:19:10.172  INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:5672
2019-08-30 12:19:10.198  INFO 12224 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#123d7057:0/SimpleConnection@2ae574e5 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63045]
2019-08-30 12:19:10.224  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
2019-08-30 12:19:10.225  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=B, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=b4b5415d-4ad9-167b-2bea-905ec1cc66cf, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750224}]
ACK:GenericMessage [payload=A, headers={amqp_publishConfirm=true, id=5d85f459-661d-69c4-4d36-b5843289b41b, timestamp=1567156750227}]
2019-08-30 12:19:10.227  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
2019-08-30 12:19:10.228  INFO 12224 --- [nio-8080-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=C, headers={content-length=5, http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, accept=[text/plain, application/json, application/*+json, */*], accept-charset=[Big5, Big5-HKSCS, CESU-8, EUC-JP, EUC-KR, GB18030, GB2312, GBK, IBM-Thai, IBM00858, IBM01140, IBM01141, IBM01142, IBM01143, IBM01144, IBM01145, IBM01146, IBM01147, IBM01148, IBM01149, IBM037, IBM1026, IBM1047, IBM273, IBM277, IBM278, IBM280, IBM284, IBM285, IBM290, IBM297, IBM420, IBM424, IBM437, IBM500, IBM775, IBM850, IBM852, IBM855, IBM857, IBM860, IBM861, IBM862, IBM863, IBM864, IBM865, IBM866, IBM868, IBM869, IBM870, IBM871, IBM918, ISO-2022-CN, ISO-2022-JP, ISO-2022-JP-2, ISO-2022-KR, ISO-8859-1, ISO-8859-13, ISO-8859-15, ISO-8859-16, ISO-8859-2, ISO-8859-3, ISO-8859-4, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, JIS_X0201, JIS_X0212-1990, KOI8-R, KOI8-U, Shift_JIS, TIS-620, US-ASCII, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE, UTF-8, windows-1250, windows-1251, windows-1252, windows-1253, windows-1254, windows-1255, windows-1256, windows-1257, windows-1258, windows-31j, x-Big5-HKSCS-2001, x-Big5-Solaris, x-euc-jp-linux, x-EUC-TW, x-eucJP-Open, x-IBM1006, x-IBM1025, x-IBM1046, x-IBM1097, x-IBM1098, x-IBM1112, x-IBM1122, x-IBM1123, x-IBM1124, x-IBM1166, x-IBM1364, x-IBM1381, x-IBM1383, x-IBM300, x-IBM33722, x-IBM737, x-IBM833, x-IBM834, x-IBM856, x-IBM874, x-IBM875, x-IBM921, x-IBM922, x-IBM930, x-IBM933, x-IBM935, x-IBM937, x-IBM939, x-IBM942, x-IBM942C, x-IBM943, x-IBM943C, x-IBM948, x-IBM949, x-IBM949C, x-IBM950, x-IBM964, x-IBM970, x-ISCII91, x-ISO-2022-CN-CNS, x-ISO-2022-CN-GB, x-iso-8859-11, x-JIS0208, x-JISAutoDetect, x-Johab, x-MacArabic, x-MacCentralEurope, x-MacCroatian, x-MacCyrillic, x-MacDingbat, x-MacGreek, x-MacHebrew, x-MacIceland, x-MacRoman, x-MacRomania, x-MacSymbol, x-MacThai, x-MacTurkish, x-MacUkraine, x-MS932_0213, x-MS950-HKSCS, x-MS950-HKSCS-XP, x-mswin-936, x-PCK, x-SJIS_0213, x-UTF-16LE-BOM, X-UTF-32BE-BOM, X-UTF-32LE-BOM, x-windows-50220, x-windows-50221, x-windows-874, x-windows-949, x-windows-950, x-windows-iso2022jp], replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e4e6738, host=localhost:8080, http_requestUrl=http://localhost:8080/spring_integration_post, connection=keep-alive, id=a4a4838b-849f-35dd-b451-a38b5a5edb13, contentType=text/plain;charset=UTF-8, user-agent=Java/11.0.3, timestamp=1567156750227}]
ACK:GenericMessage [payload=B, headers={amqp_publishConfirm=true, id=1136d70b-8dc5-2397-f936-0246c0ad8073, timestamp=1567156750231}]
ACK:GenericMessage [payload=C, headers={amqp_publishConfirm=true, id=1b74d944-ec4c-75c2-83f5-3b36a8e89a39, timestamp=1567156750232}]

这意味着根本没有调用我的 2 个处理程序。为什么?

更新

我还能够应用与发布订阅相关的建议:

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class))
//                .enrich(enricherSpec -> {
//                    enricherSpec.header("correlationId", 1); //ackCorrelationId
//                })
                .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
                .log()
                //.barrier(1000L)
                .log()
                .publishSubscribeChannel(publishSubscribeSpec -> {
                            publishSubscribeSpec.applySequence(true);
                            publishSubscribeSpec.subscribe(f -> Amqp.outboundAdapter(amqpTemplate())
                                    .exchangeName("barrierExchange")
                                    .routingKey("barrierKey")
                                    .confirmAckChannel(confirmAckChannel())
                                    .confirmCorrelationFunction(Message::getPayload));
                            publishSubscribeSpec.subscribe(flow -> {
                                flow.handle((p, h) -> "from server: " + p);
                            });
                        }
                ) .get();

错误消失了,但客户端只得到最后一个拆分的部分:

1,2,3,4,5
2019-08-30 12:48:24.570  INFO 19476 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]
We got response: GenericMessage [payload=from server: 5, headers={connection=keep-alive, id=984d89ef-aa99-7f37-8bfa-99612c05831a, Content-Length=14, contentType=text/plain;charset=UTF-8, http_statusCode=200 OK, Date=1567158504000, timestamp=1567158504569}]

据我了解,在这种情况下,2 个子流是独立的,由于某种原因,只有最后一部分被发送给客户端......

4

2 回答 2

1

你的问题在这里:

.handle(Amqp.outboundAdapter(amqpTemplate())
                    .exchangeName("barrierExchange")
                    .routingKey("barrierKey")
                    .confirmAckChannel(confirmAckChannel())
                    .confirmCorrelationExpression("payload")
            )
            .get();

您只需停止您的流程,同时它从Http.inboundGateway()其中开始,除了一些回复发回。随着Amqp.outboundAdapter()您将流程设为单向,因此不会将回复发送回等待的入站网关。

由于没有回复发送回客户端,因此最终会出现request timeout错误,表明服务器没有及时发送回复是有罪的。

这是您的情况的答案。如何修复以及超出此问题范围的内容。我看到您尝试将barrier示例从 XML 复制到 Java DSL,但是您的配置中缺少很多部分。而且你还在几个地方打破了逻辑:

enricherSpec.header("correlationId", 1)

1您对每个请求都使用相同的静态。因此,当涉及到相关逻辑时,您将遇到不同请求之间的冲突问题。

 .split(s -> s.applySequence(false)

如果你要实现barrier,你确实会有一个聚合器,当逻辑真的基于相关性时。因此,之后applySequence必须true进行适当的聚合。从这里开始,这header("correlationId"又是错误的。我们需要一个相关键barrier(基本上在请求和延迟回复之间)。这就是为什么它就像ackCorrelationId在示例中一样。另一个内置correlationId(连同其他相关细节)由 spitter 填充并由聚合器使用。

于 2019-08-29T14:43:03.667 回答
0

您的请求超时,因为您移除了障碍并且没有发送响应。

@Bean
public IntegrationFlow integrationFlow(RabbitTemplate amqpTemplate) {
    return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class))
            .enrich(enricherSpec -> {
                enricherSpec.header("correlationId", 1); // ackCorrelationId
            })
            .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
            .log()
            // .barrier(1000L)
            .log()
            .publishSubscribeChannel(pubsub -> pubsub
                    .subscribe(f1 -> f1
                            .handle(Amqp.outboundAdapter(amqpTemplate)
                                    .exchangeName("barrierExchange")
                                    .routingKey("barrierKey")
                                    .confirmAckChannel(confirmAckChannel())
                                    .confirmCorrelationExpression("payload")))
                    .subscribe(f2 -> f2.handle((p, h) -> "Processed OK")))
            .get();
}

此外,当您实现障碍时,您的相关表达式必须是#this; 否则您将丢失障碍相关标头。

于 2019-08-29T14:46:17.827 回答