0

概括

我正在使用 Spring Integration 的TCP 和 UDP 支持将 TCP 流流量通过我的应用程序代理到上游服务器,然后将该服务器的响应通过我的应用程序代理回客户端。虽然这是双向通信,但我需要大容量的异步吞吐量,所以我不能使用网关。相反,我正在尝试使用第 34.8.2 节中描述的协作出站和入站通道适配器。

集成组件设置

要求

ATcpReceivingChannelAdapter通过端口 6060 接收请求。TcpNetServerConnectionFactory它将这些请求放在 requests 上QueueChannel。请求由 a 接收TcpSendingMessageHandler,它通过 a 生成的客户端连接发送请求TcpNetClientConnectionFactory。此连接将请求从我的应用程序发送到上游服务器。

回复

ATcpReceivingChannelAdapter通过连接从上游服务器接收响应。TcpNetClientConnectionFactory它将这些响应放在响应上QueueChannel。响应由 a 拾取TcpSendingMessageHandler,它尝试通过来自原始 的连接将响应发送回客户端TcpNetServerConnectionFactory。这个最终的连接是失败的。

    @Bean
    public PollableChannel requestChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public PollableChannel replyChannel() {
        return new QueueChannel(1000);
    }

    @Bean
    public TcpNetServerConnectionFactory serverFactory() {
        TcpNetServerConnectionFactory serverFactory = new TcpNetServerConnectionFactory(6060);
        serverFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        serverFactory.setSingleUse(false);
        return serverFactory;
    }

    @Bean
    public TcpNetClientConnectionFactory clientFactory() {
        TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("127.0.0.1", 6080);
        clientFactory.setSerializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setDeserializer(new ByteArrayLengthHeaderSerializer(2));
        clientFactory.setSingleUse(false);
        return clientFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundRequestAdapter() {
        TcpReceivingChannelAdapter inboundRequestAdapter = new TcpReceivingChannelAdapter();
        inboundRequestAdapter.setConnectionFactory(serverFactory());
        inboundRequestAdapter.setOutputChannel(requestChannel());
        return inboundRequestAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "requestChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundRequestAdapter() {
        TcpSendingMessageHandler outboundRequestAdapter = new TcpSendingMessageHandler();
        outboundRequestAdapter.setConnectionFactory(clientFactory());
        return outboundRequestAdapter;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundReplyAdapter() {
        TcpReceivingChannelAdapter inboundReplyAdapter = new TcpReceivingChannelAdapter();
        inboundReplyAdapter.setConnectionFactory(clientFactory());
        inboundReplyAdapter.setOutputChannel(replyChannel());
        return inboundReplyAdapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel", poller = @Poller(fixedDelay = "50", receiveTimeout = "5000"))
    public TcpSendingMessageHandler outboundReplyAdapter() {
        TcpSendingMessageHandler outboundReplyAdapter = new TcpSendingMessageHandler();
        outboundReplyAdapter.setConnectionFactory(serverFactory());
        return outboundReplyAdapter;
    }

实际结果

错误:

Unable to find outbound socket for GenericMessage

完整的堆栈跟踪:

2019-02-01 14:10:55.315 ERROR 32553 --- [ask-scheduler-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : Unable to find outbound socket for GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
2019-02-01 14:10:55.319 ERROR 32553 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, failedMessage=GenericMessage [payload=byte[297], headers={ip_tcp_remotePort=6080, ip_connectionId=localhost:6080:51339:a3f66802-b194-4564-99c7-f194e55ddb11, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=bc36ec21-e2ae-405e-afa9-c0ec2f2eff8d, ip_hostname=localhost, timestamp=1549051855315}]
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这是有道理的。我知道 a在转发消息时会TcpReceivingChannelAdapter设置ip_connectionIdMessage 标头字段。由于我现在没有任何关联逻辑,因此当负载被上游代理时,来自第一个入站适配器的 ID 标头会丢失,而第二个入站适配器会生成一个新的 ID 标头。

结果,当回复返回到最终的出站适配器时,ID 标头与相应的入站适配器所知道的任何内容都不匹配。因此,它不知道使用哪个连接来发送响应。

我的问题是:有什么方法可以设置“默认”连接,或者在不向上游发送的情况下通过关联数据来增加有效负载?

问题是我的应用程序必须是关于上游服务器的透明代理。如果我用相关数据完全增加有效负载,上游服务器将拒绝它。

4

1 回答 1

0

如果没有包含相关信息的数据,就很难关联请求/回复。

可以这样TcpOutboundGateway做是因为套接字本身用于关联;每个套接字上一次只能有一个未完成的请求。通过CachingClientConnectionFactory维护套接字池允许网关中的并发性。

一种技术可能是自定义客户端连接工厂,它维护服务器工厂连接和传出连接之间的一对一映射。然后,当收到回复时,查找要向其发送回复的相应服务器工厂连接。它只需要几个映射 - 服务器连接 ID 到客户端连接,客户端连接 ID 到服务器连接 ID。

如果您提出解决方案,请考虑将其贡献回框架。

于 2019-02-01T21:02:26.780 回答