我正在尝试通过 Spring Integration 的 AMQP 输入/输出适配器发送/接收消息,但我遇到了这个问题。
在这里找到 Gary 的答案后,我开始调查我的应用是否正确设置了消息 ID。事实上,它在这里被自动处理了。
制作人是这样的。
我故意发送错误消息,在消费者端我看到它的消息转换器在这里失败。
之后,消息会重新排队并无休止地重新处理。
在调试这个问题时,我注意到发送和接收消息的 ID 总是不同的。
在进一步调试此事件后,我了解到 Spring 的核心消息传递框架提供的标准 ID 字段被标记为瞬态,并且瞬态标头永远不会被映射。
问题?
- 为什么框架不能自动映射
MessageHeaders.ID
到AmqpHeaders.MESSAGE_ID
? - 我应该将自定义标头字段作为 ID 并实施
MessageKeyGenerator
吗? - 顺便说一句,我将如何使用新的 Java DSL 来做到这一点?
非常感谢!
干杯,拉斯洛
更新:在无限循环中重新处理失败消息的原因是由其他原因引起的。
defaultRequeueRejected(false)
通过添加到 AMQP 入站适配器的侦听器容器配置,我设法解决了这个问题。
@Bean
public IntegrationFlow webhookInboundFlow(
ConnectionFactory connectionFactory, ObjectMapper objectMapper,
HeaderValueRouter webhookInboundRouter) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, FORGETME_WEBHOOK_QUEUE_NAME)
.configureContainer(s -> s.defaultRequeueRejected(false))
)
.log(INFO)
.transform(new ObjectToJsonNodeTransformer(objectMapper))
.route(webhookInboundRouter)
.get();
}