我正在使用弹簧靴(2.5.5)。我看到使用 rabbitmq 队列侦听器的行为不一致。
问题是,accountService.incrementMediaCount即使neoPostService.createNeoPost抛出异常也会执行,因此会执行多次,因为重试策略设置为 5,我想这似乎很奇怪,因为只有在成功accountService.incrementMediaCount执行时才应该执行neoPostService.createNeoPost(无异常)。此外,loggingRepository.save()执行次数少于进入队列的消息数。是什么导致了这种行为?它与 RabbitMQ 的分布式特性有关吗?
队列监听器
@RabbitListener(queues = [RabbitConfiguration.Companion.Queues.POST_CREATION_QUEUE], concurrency = "3")
fun postCreationListener(postMessage: PostMessage) {
neoPostService.createNeoPost(postMessage.postId)
accountService.incrementMediaCount(postMessage.userId)
loggingRepository.save(
Log(
resourceType = ResourceType.Post,
entityEventType = EntityEventType.Created,
message = "Post ${postMessage.postId} created for user ${postMessage.userId}"
)
)
}
fun createNeoPost(postId: Long): NeoPost {
// Get post by id from postgresql
val post = postRepository.findByIdOrNull(postId)
?: throw ResourceNotFoundException("Post $postId not found")
val neoUser = neoUserService.getUserById(post.user.userId!!)
// Save post to Neo4j
return neoPostRepository.save(post.toNeoPost().copy(user = neoUser))
}
//Increment counter in postgresql
fun incrementMediaCount(userId: Long, amount: Int = 1) {
accountRepository.incrementPostCounter(userId, amount)
}
兔子配置
@EnableRabbit
@Configuration
class RabbitConfiguration {
// other code
....
@Bean
fun retryInterceptor(): RetryOperationsInterceptor? {
return RetryInterceptorBuilder.stateless()
.backOffOptions(100, 5.0, 10000)
.maxAttempts(5)
.recoverer(RejectAndDontRequeueRecoverer())
.build()
}
....
}