我有一个sftp:inbound-channel-adapter
将文件从远程文件夹同步到本地文件夹。当它发现一个扩展名为 .xml 的文件时,它会创建Message<File>
一个新事务并将其发送到routerChannel
通道。
查看approverRouter
xml 内部并确定该过程是否可以继续。如果一切正常,则将消息发送到“okChannel”并关闭到数据库进行保存。但是,如果事情不正常,则将消息发送到int:delayer
10 秒,然后重新提交到approverRouter
.
如果交易成功或失败sftpCommittedChannel
,只需将下载的文件移动到已完成/失败的文件夹中。sftpRolledBackChannel
这是一些描述这一点的xml:
<int-sftp:inbound-channel-adapter id="..." channel="routerChannel" local-filter="localOnlyXmlFilter" ... >
<int:poller fixed-rate="10000" max-messages-per-poll="-1">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<!-- Takes Message<Map> makes sure all files exist and are valid MD5 checked -->
<int:router input-channel="routerChannel" ref="approverRouter" method="resolveRoute"/>
<bean id="approverRouter" class="com.example.app.readers.ApproverRouter">
<property name="errorChannel" value="errorChannel"/>
<property name="retryChannel" value="myRetry"/>
<property name="okChannel" value="transformHashesToList"/>
<property name="timeout" value="${timout}"/>
</bean>
<int:delayer id="delayer" input-channel="myRetry" output-channel="routerChannel" default-delay="10000"/>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit channel="sftpCommittedChannel"/>
<int:after-rollback channel="sftpRolledBackChannel"/>
</int:transaction-synchronization-factory>
我的问题是,当消息发送到延迟器时,轮询器中创建的事务成功完成。我认为这是因为当消息到达int:delayer
(通过 retryChannel)时,它不再在同一个原始线程中。
我想我需要的是一个同步延迟器,它增加了延迟,但将消息保留在其原始事务中。这感觉就像我的设计不正确。
普通人会做什么来创建重试延迟并在成功或失败时仍整理 sftp 文件夹。