我有一个弹簧集成流程定义如下的项目:
@Bean
public IntegrationFlow validationIntegrationFlow( ValidationService<String> validationService, ReleaseStrategy bundleReleaseStrategy ) {
return IntegrationFlows.from( FtpIntegrationFlowConfiguration.BANK_FTP_CHANNEL )
.split()
.enrichHeaders( headerEnricherSpec -> {
headerEnricherSpec.headerFunction( DESTINATION_CHANNEL_HEADER, message ->
validationService.validate( ( String ) message.getPayload() ) ?
BANK_FTP_VALID_CHANNEL : BANK_FTP_BAD_CHANNEL );
} )
.aggregate( aggregatorSpec -> aggregatorSpec
.correlationStrategy( message -> message.getHeaders().get( DESTINATION_CHANNEL_HEADER ) )
.releaseStrategy( bundleReleaseStrategy )
.expireGroupsUponCompletion( true )
.groupTimeout( 1000 )
.sendPartialResultOnExpiry( true ) )
.route( new HeaderValueRouter( DESTINATION_CHANNEL_HEADER ) )
.get();
}
它的作用是验证每条消息并分配正确的目标标头,然后将结果分组聚合并将其发送到路由器以分派到正确的通道
我有一个检查流的服务,以便在它启动时记录将发送多少消息,并且在 2 个目标通道之后,我有 2 个句柄告诉服务已处理 X 大小的捆绑包,
下面是我得到的日志样本
2016-12-09 17:04:00.176 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | synchronize | Start sync for correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49] with value [7956]
2016-12-09 17:04:01.397 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [6956]
2016-12-09 17:04:01.752 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [5956]
2016-12-09 17:04:02.114 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [4956]
2016-12-09 17:04:02.410 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [3956]
2016-12-09 17:04:02.681 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [2956]
2016-12-09 17:04:02.991 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [1956]
2016-12-09 17:04:03.344 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [956]
2016-12-09 17:04:05.538 | [taskScheduler-5] | INFO | org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler | expireGroup | Expiring MessageGroup with correlationKey[bad.channel]
2016-12-09 17:04:05.538 | [taskScheduler-5] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [955]
2016-12-09 17:04:05.590 | [taskScheduler-9] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [955] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [0]
2016-12-09 17:04:06.635 | [taskScheduler-2] | WARN | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | counter for [b5e69b73-aa99-4e9e-a7d2-59c015793a49] not found, ignoring subtract of [955]
正如您所看到的,在处理完所有消息后,在 ExpireGroup 日志告诉我创建了另一个包之后,我仍然会记录一个警告。
我尝试将日志放在聚合器之前,但那里没有出现重复的消息,谁能帮我配置聚合器,这样我就不会得到重复的消息?
ps:如果有帮助的话,我是如何实例化捆绑发布策略的
new TimeoutCountSequenceSizeReleaseStrategy( 100, 1000L );