我有一个具有多个进程的(旧版)TCP 服务。每个进程在同一主机上运行,但在不同的端口上。该服务是单线程的,因此增加吞吐量的方法是在每个端口上循环每个请求。
我正在为这个遗留应用程序提供 AMQP 暴露。它非常简单——从 AMQP 队列中取出一个字符串,将其传递给应用程序,然后将响应字符串返回给 AMQP 回复队列。
这在单个端口上效果很好。但是,我想分散所有端口的请求。
Spring Integration 似乎只提供AbstractClientConnectionFactory
直接连接到单个主机/端口 ( TcpNetClientConnectionFactory
) 或维护到单个主机/端口 ( CachingClientConnectionFactory
) 的连接池的实现。单个主机和多个端口之间没有任何池连接。
我试图编写我自己的AbstractClientConnectionFactory
来维护AbstractClientConnectionFactory
对象池和它们之间的循环。但是,当目标服务消失或网络中断时,我遇到了几个我无法解决的处理 TCP 连接的问题。
这个问题也采用了方法:Spring Integration 4 - 在 Java DSL 中配置 LoadBalancingStrategy,但解决方案是硬编码端点的数量。就我而言,端点的数量仅在运行时才知道,并且是用户可配置的设置。
因此,基本上我需要在运行时动态TcpOutboundGateway
创建每个端口,并以某种方式将其注册到我的. 我尝试了以下方法:IntegrationFlow
@Bean
public IntegrationFlow xmlQueryWorkerIntegrationFlow() {
SimpleMessageListenerContainer inboundQueue = getMessageListenerContainer();
DirectChannel rabbitReplyChannel = MessageChannels.direct().get();
IntegrationFlowBuilder builder = IntegrationFlows
.from(Amqp.inboundGateway(inboundQueue)
.replyChannel(rabbitReplyChannel))
/* SOMEHOW DO THE ROUND ROBIN HERE */
//I have tried:
.channel(handlerChannel()) //doesnt work, the gateways dont get started and the message doesnt get sent to the gateway
//and I have also tried:
.handle(gateway1)
.handle(gateway2) //doesnt work, it chains the handlers instead of round-robining between them
//
.transform(new ObjectToStringTransformer())
.channel(rabbitReplyChannel);
return builder.get();
}
@Bean
//my attempt at dynamically adding handlers to the same channel and load balancing between them
public DirectChannel handlerChannel() {
DirectChannel channel = MessageChannels.direct().loadBalancer(new RoundRobinLoadBalancingStrategy()).get();
for (AbstractClientConnectionFactory factory : generateConnections()) {
channel.subscribe(generateTcpOutboundGateway(factory));
}
return channel;
}
有谁知道我该如何解决这个问题?