0

我正在使用 spring 集成通过提供一些消息并获得响应来对服务器进行 TCP 调用。我更喜欢使用通道适配器来发送和接收批量消息。我面临的问题是响应渠道。为响应频道获取“调度程序没有频道订阅者”。一切正常,除了响应没有在响应通道上传输。我可以看到服务器上的握手以及日志中的响应被放在响应和记录器通道上。但是在抛出异常之后。配置设置是:

<gateway id="clientPositionsGateway" service-interface="MyGatewayInterface">
        <method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" />  
    </gateway>


    <channel id="clientPositionsRequestChannel" />

    <splitter input-channel="clientPositionsRequestChannel"
            output-channel="singleClientPositionsRequestChannel" />

    <channel id = "singleClientPositionsRequestChannel" />

    <transformer
        input-channel="singleClientPositionsRequestChannel"
        output-channel="dmQueryRequestChannel"
        ref="dmPosBaseQueryTransformer" />

    <channel id = "dmQueryRequestChannel">
        <!-- <dispatcher task-executor="executor"/> -->
    </channel>

    <ip:tcp-connection-factory id="csClient"
           type="client"
           host="somehost"
           port="12345"
           single-use="true"
           deserializer="connectionSerializeDeserialize"
            />

    <ip:tcp-outbound-channel-adapter id="dmServerOutboundAdapter"
            channel="dmQueryRequestChannel"
            connection-factory="csClient"
            order="2"
            />

    <ip:tcp-inbound-channel-adapter id="dmServerInboundAdapter"
            channel="dmQueryResponseChannel"
            connection-factory="csClient"
            error-channel="errorChannel"/>

<channel id="dmQueryResponseChannel"/>
4

6 回答 6

1

正如 Artem 在他的评论中所说,“调度程序没有订阅者”意味着这里没有配置端点来接收响应dmQueryResponseChannel,或者配置了该通道作为其输入通道的端点没有启动。

在任何情况下,即使您解决了这个问题,为请求/响应场景使用独立的适配器也很棘手,因为框架无法自动将响应与请求相关联。这就是出站网关的用途。您可以使用协作适配器,但您必须自己处理相关性。如果您使用请求/回复网关来启动流程,则必须使用诸如tcp-client-server-multiplex 示例中探索的技术。这是因为使用独立适配器意味着您将丢失replyChannel用于将响应返回到网关的标头。

或者,您可以使用 void 返回网关来发送请求,<int:outbound-channel-adapter/>这样框架将使用响应进行回调,您可以通过编程方式进行自己的关联。

于 2013-09-20T14:43:33.897 回答
0

这是解决我的问题的代码:

    @ContextConfiguration(locations={"/clientGIM2Position.xml"})
    @RunWith(SpringJUnit4ClassRunner.class)

    public class GetClientPositionsTest {

    @Autowired

    ClientPositionsGateway clientPositionsGateway;

    @Test

    public void testGetPositions() throws Exception {

    String positions = clientPositionsGateway.fetchClientPositions(clientList);

    System.out.println("returned !!!!" + positions);

           }
    }
于 2013-09-21T12:23:38.397 回答
0
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/integration" 
    xmlns:ip="http://www.springframework.org/schema/integration/ip"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <!-- intercept and log every message -->

    <logging-channel-adapter id="logger" level="DEBUG" />
    <wire-tap channel = "logger" />

    <gateway id="clientPositionsGateway"
         service-interface="com.example.ClientPositionsGateway">
        <method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="dmQueryResponseChannel"/>
    </gateway>

    <channel id="clientPositionsRequestChannel" />

    <splitter input-channel="clientPositionsRequestChannel"
            output-channel="singleClientPositionsRequestChannel" />

    <channel id = "singleClientPositionsRequestChannel" />

    <transformer
        input-channel="singleClientPositionsRequestChannel"
        output-channel="dmQueryRequestChannel"
        ref="dmPosBaseTransQueryTransformer" />

    <channel id = "dmQueryRequestChannel">
        <dispatcher task-executor="executor"/>
    </channel>

    <ip:tcp-connection-factory id="csClient" 
           type="client" 
           host="hostserver"
           port="22010"
           single-use="true"
           deserializer="connectionSerializeDeserialize"
           />

    <ip:tcp-outbound-gateway id="dmServerGateway" 
           request-channel="dmQueryRequestChannel" 
           reply-channel="dmQueryResponseChannel"
           connection-factory="csClient"  />

    <channel id="dmQueryResponseChannel">
        <dispatcher task-executor="executor"/>
    </channel>

    <channel id="serverBytes2StringChannel" />

    <bean id="connectionSerializeDeserialize" class="com.example.DMQueryResponseSerializer"/>
    <bean id="dmPosBaseTransQueryTransformer" class="com.example.DMPOSBaseTransQueryTransformer"/> 
    <task:executor id="executor" pool-size="5"/>

</beans:beans>
于 2013-09-21T16:31:56.120 回答
0

配置设置:

<gateway id="clientPositionsGateway" service-interface="com.example.ClientPositionsGateway">
    <method name="fetchClientPositions" request-channel="clientPositionsRequestChannel"  reply-channel="dmQueryResponseChannel"/>  
</gateway>

<channel id="clientPositionsRequestChannel" />

<splitter input-channel="clientPositionsRequestChannel"
        output-channel="singleClientPositionsRequestChannel" />

<channel id = "singleClientPositionsRequestChannel" />

<transformer
    input-channel="singleClientPositionsRequestChannel"
    output-channel="dmQueryRequestChannel"
    ref="dmPosBaseQueryTransformer" />

<logging-channel-adapter channel="clientBytes2StringChannel"/>  

<channel id = "dmQueryRequestChannel">
     <dispatcher task-executor="executor"/> 
</channel>

<ip:tcp-connection-factory id="csClient" 
       type="client" 
       host="serverHost"
       port="22010"
       single-use="true"
       deserializer="connectionSerializeDeserialize"
       />

<ip:tcp-outbound-channel-adapter id="dmServerOutboundAdapter"
        channel="dmQueryRequestChannel" 
        connection-factory="csClient"
        /> 

<ip:tcp-inbound-channel-adapter id="dmServerInboundAdapter"
        channel="dmQueryResponseChannel" 
        connection-factory="csClient" 
        error-channel="errorChannel"/>

<transformer input-channel="dmQueryResponseChannel" output-channel="clientBytes2StringChannel" ref="dmPOSBaseQueryResponseTransformer"
        />

<channel id="dmQueryResponseChannel"/>

<channel id="clientBytes2StringChannel"/>
于 2013-09-21T12:14:35.380 回答
0
public interface ClientPositionsGateway {

    String fetchClientPositions(List<String> clientList); 

}
于 2013-09-21T12:20:06.053 回答
0

如果您的clientPositionsGateway是从客户端线程调用的,则没有理由使用执行程序通道。如果你做百万循环clientPositionsGateway尝试使用未来网关:http ://docs.spring.io/spring-integration/docs/2.2.5.RELEASE/reference/html/messaging-endpoints-chapter.html#async-gateway和再次:没有执行者通道。而且我看不出有理由 reply-channel在两个网关上都使用。还有一个:你有<splitter>before <tcp:outbound-gateway>,但是<aggregator>after 在哪里<tcp:outbound-gateway>?..在你目前的情况下,你从你的clientPositionsGateway得到回复,所有其他人都将被删除,因为TemporaryReplyChannel已经关闭。

于 2013-09-21T17:24:27.420 回答