0

我正在使用弹簧靴(2.5.5)。我看到使用 rabbitmq 队列侦听器的行为不一致。

问题是,accountService.incrementMediaCount即使neoPostService.createNeoPost抛出异常也会执行,因此会执行多次,因为重试策略设置为 5,我想这似乎很奇怪,因为只有在成功accountService.incrementMediaCount执行时才应该执行neoPostService.createNeoPost(无异常)。此外,loggingRepository.save()执行次数少于进入队列的消息数。是什么导致了这种行为?它与 RabbitMQ 的分布式特性有关吗?

队列监听器

@RabbitListener(queues = [RabbitConfiguration.Companion.Queues.POST_CREATION_QUEUE], concurrency = "3")
fun postCreationListener(postMessage: PostMessage) {
    neoPostService.createNeoPost(postMessage.postId)
    accountService.incrementMediaCount(postMessage.userId)
    loggingRepository.save(
        Log(
            resourceType = ResourceType.Post,
            entityEventType = EntityEventType.Created,
            message = "Post ${postMessage.postId} created for user ${postMessage.userId}"
        )
    )
}
fun createNeoPost(postId: Long): NeoPost {
// Get post by id from postgresql
    val post = postRepository.findByIdOrNull(postId)
        ?: throw ResourceNotFoundException("Post $postId not found")

    val neoUser = neoUserService.getUserById(post.user.userId!!)
// Save post to Neo4j
    return neoPostRepository.save(post.toNeoPost().copy(user = neoUser))
}
//Increment counter in postgresql
fun incrementMediaCount(userId: Long, amount: Int = 1) {
    accountRepository.incrementPostCounter(userId, amount)
}

兔子配置

@EnableRabbit
@Configuration
class RabbitConfiguration {
    // other code
    ....
    @Bean
    fun retryInterceptor(): RetryOperationsInterceptor? {
        return RetryInterceptorBuilder.stateless()
            .backOffOptions(100, 5.0, 10000)
            .maxAttempts(5)
            .recoverer(RejectAndDontRequeueRecoverer())
            .build()
    }
    ....
}
4

0 回答 0