3

我有一个弹簧集成流程定义如下的项目:

@Bean
public IntegrationFlow validationIntegrationFlow( ValidationService<String> validationService, ReleaseStrategy bundleReleaseStrategy ) {
    return IntegrationFlows.from( FtpIntegrationFlowConfiguration.BANK_FTP_CHANNEL )                
            .split()
            .enrichHeaders( headerEnricherSpec -> {
                headerEnricherSpec.headerFunction( DESTINATION_CHANNEL_HEADER, message ->
                        validationService.validate( ( String ) message.getPayload() ) ?
                                BANK_FTP_VALID_CHANNEL : BANK_FTP_BAD_CHANNEL );
            } )
            .aggregate( aggregatorSpec -> aggregatorSpec
                    .correlationStrategy( message -> message.getHeaders().get( DESTINATION_CHANNEL_HEADER ) )
                    .releaseStrategy( bundleReleaseStrategy )
                    .expireGroupsUponCompletion( true )
                    .groupTimeout( 1000 )
                    .sendPartialResultOnExpiry( true ) )
            .route( new HeaderValueRouter( DESTINATION_CHANNEL_HEADER ) )
            .get();
}

它的作用是验证每条消息并分配正确的目标标头,然后将结果分组聚合并将其发送到路由器以分派到正确的通道

我有一个检查流的服务,以便在它启动时记录将发送多少消息,并且在 2 个目标通道之后,我有 2 个句柄告诉服务已处理 X 大小的捆绑包,

下面是我得到的日志样本

2016-12-09 17:04:00.176 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | synchronize | Start sync for correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49] with value [7956]
2016-12-09 17:04:01.397 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [6956]
2016-12-09 17:04:01.752 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [5956]
2016-12-09 17:04:02.114 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [4956]
2016-12-09 17:04:02.410 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [3956]
2016-12-09 17:04:02.681 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [2956]
2016-12-09 17:04:02.991 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [1956]
2016-12-09 17:04:03.344 | [taskScheduler-2] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1000] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [956]
2016-12-09 17:04:05.538 | [taskScheduler-5] | INFO | org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler | expireGroup | Expiring MessageGroup with correlationKey[bad.channel]
2016-12-09 17:04:05.538 | [taskScheduler-5] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [1] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [955]
2016-12-09 17:04:05.590 | [taskScheduler-9] | DEBUG | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | Subtract [955] from correlationId [b5e69b73-aa99-4e9e-a7d2-59c015793a49]. Result is [0]
2016-12-09 17:04:06.635 | [taskScheduler-2] | WARN | service.sync.impl.SynchronizationServiceInMemory | subtractAndGet | counter for [b5e69b73-aa99-4e9e-a7d2-59c015793a49] not found, ignoring subtract of [955]

正如您所看到的,在处理完所有消息后,在 ExpireGroup 日志告诉我创建了另一个包之后,我仍然会记录一个警告。

我尝试将日志放在聚合器之前,但那里没有出现重复的消息,谁能帮我配置聚合器,这样我就不会得到重复的消息?

ps:如果有帮助的话,我是如何实例化捆绑发布策略的

new TimeoutCountSequenceSizeReleaseStrategy( 100, 1000L );
4

0 回答 0