4

代码:https ://github.com/giuliopulina/spring-integration-poller

我在尝试使用 Spring 集成创建 jdbc 轮询器时遇到问题。

当我向表格提供新数据时,处理速度比预期的要慢:一切正常,除了轮询每 60 秒触发一次,我不明白为什么。

2015-05-27 10:50:40,234调试 ExpressionEvaluatingSqlParameterSourceFactory - 将表达式#root.![pk] 解析为(pks 列表)

2015-05-27 10:51:40,234调试 ExpressionEvaluatingSqlParameterSourceFactory - 将表达式#root.![pk] 解析为(pks 列表)

这是spring集成配置xml的相关部分:

<task:executor id="pollerPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/>

<!--<task:executor id="processingPool" pool-size="5-20" queue-capacity="0" rejection-policy="CALLER_RUNS" keep-alive="5"/> -->

<bean id="jdbcSource" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
    <constructor-arg ref="dataSource"/>
    <constructor-arg value="XXXXXXXXXXXXXX"/>
    <property name="updateSql" value="XXXXXXXXXXXXXXXX"/>
    <property name="maxRowsPerPoll" value="50"/>
</bean>

<int:inbound-channel-adapter send-timeout="10000" auto-startup="false" id="inboundAdapter" ref="jdbcSource" channel="jdbcOutputChannel">
    <int:poller receive-timeout="3000" time-unit="MILLISECONDS" fixed-rate="0" error-channel="errorChannel" task-executor="pollerPool">
        <int:advice-chain>
            <ref bean="threadPrepareInterceptor"/>
            <ref bean="txAdvice"/>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<int:service-activator id="serviceActivator" input-channel="jdbcOutputChannel" ref="someServiceActivatorBean"/>

<tx:advice id="txAdvice" transaction-manager="txManager">
    <tx:attributes>
        <tx:method name="get*" read-only="true"/>
        <tx:method name="*"/>
    </tx:attributes>
</tx:advice>

<int:channel id="jdbcOutputChannel"  >
    <!-- using direct channel -->
    <!--<int:dispatcher task-executor="processingPool"/>-->
</int:channel>

你能帮我理解这个问题吗?

更新:

关于事务的“jdbcOutputChannel”建议,我同意并根据您的提示修改了我的配置,因为它更干净(无论如何,服务激活器也在单独的事务中运行,即使 xml 示例中没有列出)。

关于我遇到的问题,我尝试删除所有其他 Spring 集成组件,并且轮询器按我的预期连续触发(我知道固定速率 = 0 太高了:))相反,当项目中配置了其他轮询器时像这样,我的轮询器似乎也继承了相同的超时:

<int:service-activator id="someOtherServiceActivator">
    <int:poller fixed-rate="0" error-channel="someOtherPollerErrorChannel" receive-timeout="60000" />
</int:service-activator>

将其他轮询器的超时切换为 10000 毫秒,我的轮询器每 10 秒(而不是 60 秒)触发一次。我无法分享完整的 spring 集成配置,但我想问一下:完全分离的 poller 可以修改彼此的行为吗?

更新 2:我创建了一个单独的项目试图重现该问题,但我仍然无法做到。因此,我尝试删除以下配置,它是为了仅在应用程序完全启动并运行时才启动轮询器而引入的:

<int:publish-subscribe-channel id="startupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="startupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="startupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="startupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" /> 

而且问题已经消失了,即使这个我完全可以理解原因。无论如何,为我的轮询器创建一个不同的 startupChannel 效果很好:

<int:publish-subscribe-channel id="globalStartupChannel" />
<int:publish-subscribe-channel id="myStartupChannel" />
<int:control-bus input-channel="controlBusChannel" />

<int-event:inbound-channel-adapter channel="globalStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>
<int-event:inbound-channel-adapter channel="myStartupChannel" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:transformer input-channel="globalStartupChannel" expression="'@someOtherServiceActivator.start()'" output-channel="controlBusChannel" />
<int:transformer input-channel="myStartupChannel" expression="'@inboundAdapter.start()'" output-channel="controlBusChannel" />

更新 3:

在为您准备代码的项目时,我注意到以下日志:

信息:没有明确定义名为“taskScheduler”的 bean。因此,将创建一个默认的 ThreadPoolTask​​Scheduler。

因此,我添加了以下配置,现在一切正常:

<task:scheduler id="taskScheduler" pool-size="20" />

我猜默认池大小是 10,因此在某些情况下,当 totalNumberOfPollers > taskScheduler.size() 时,配置会被覆盖。我对吗?

谢谢朱利奥

4

1 回答 1

1

我无法重现您的情况;我建议您在轮询之间进行线程转储,以查看线程在做什么。

也就是说,a fixed-rateof 0 非常具有侵略性。您的 DBA 可能会毫无延迟地进行一次合适的轮询。

此外,jdbcOutputChannel作为ExecutorChannel, 意味着事务将在消息发送到该通道后立即提交。如果您希望流程在事务中运行,则不应在此处使用调度程序。

编辑:

我仍然无法用这个重现你的情况......

<int:control-bus input-channel="input"/>

<int-event:inbound-channel-adapter channel="ps" event-types="org.springframework.context.event.ContextRefreshedEvent"/>

<int:publish-subscribe-channel id="ps" />

<int:transformer input-channel="ps" output-channel="input" expression="'@foo.start()'" />

<int:transformer input-channel="ps" output-channel="input" expression="'@sa.start()'" />

<int:inbound-channel-adapter id="foo" channel="bar" expression="'foo'" auto-startup="false">
    <int:poller fixed-rate="1000" />
</int:inbound-channel-adapter>

<int:channel id="bar">
    <int:queue />
</int:channel>

<int:service-activator id="sa" input-channel="bar" output-channel="baz" auto-startup="false"
        expression="payload.toUpperCase()">
    <int:poller fixed-rate="6000" receive-timeout="0" />
</int:service-activator>

<int:logging-channel-adapter id="baz" level="ERROR"/>

...正如预期的那样,我FOO每 6 秒看到 6 秒(每秒i-c-a轮询一次,而 sa 每 6 秒运行一次)。

编辑2

我查看了您的项目,正如您所说,您的问题的根本原因是许多轮询端点,但实际上,是这样的:

fixed-rate="0" receive-timeout="60000"

使用此配置,调度程序资源(线程)在QueueChannels 中被阻塞,并且正如您所发现的,您已经耗尽了所有资源。

一种解决方案是增加调度程序池中的线程数。

使用此配置,您似乎正在尝试通过让轮询器在队列中的接收()方法中不断等待来获得与轮询器的按需、零延迟消息传递。

如果您无法承受任何延迟,请考虑使用DirectChannels 代替。如果您不希望下游端点在调用者的线程上运行,请使用ExecutorChannels...

<task:executor id="exec" pool-size="100"/>

<int:channel id="otherMessageChannel1">
    <int:dispatcher task-executor="exec" />
</int:channel>

这通常比您当前的设置更受欢迎。

于 2015-05-27T13:03:25.227 回答