0

我正在尝试通过 Spring Integration 的 AMQP 输入/输出适配器发送/接收消息,但我遇到了这个问题

在这里找到 Gary 的答案后,我开始调查我的应用是否正确设置了消息 ID。事实上,它在这里被自动处理了。

制作人是这样的。

我故意发送错误消息,在消费者端我看到它的消息转换器在这里失败。

之后,消息会重新排队并无休止地重新处理。

在调试这个问题时,我注意到发送和接收消息的 ID 总是不同的。

在进一步调试此事件后,我了解到 Spring 的核心消息传递框架提供的标准 ID 字段被标记为瞬态,并且瞬态标头永远不会被映射

问题?

  • 为什么框架不能自动映射MessageHeaders.IDAmqpHeaders.MESSAGE_ID
  • 我应该将自定义标头字段作为 ID 并实施MessageKeyGenerator吗?
  • 顺便说一句,我将如何使用新的 Java DSL 来做到这一点?

非常感谢!

干杯,拉斯洛


更新:在无限循环中重新处理失败消息的原因是由其他原因引起的。

defaultRequeueRejected(false)通过添加到 AMQP 入站适配器的侦听器容器配置,我设法解决了这个问题。

@Bean
public IntegrationFlow webhookInboundFlow(
    ConnectionFactory connectionFactory, ObjectMapper objectMapper,
    HeaderValueRouter webhookInboundRouter) {

  return IntegrationFlows
      .from(Amqp.inboundAdapter(connectionFactory, FORGETME_WEBHOOK_QUEUE_NAME)
          .configureContainer(s -> s.defaultRequeueRejected(false))
      )
      .log(INFO)
      .transform(new ObjectToJsonNodeTransformer(objectMapper))
      .route(webhookInboundRouter)
      .get();
}
4

1 回答 1

1

为什么框架不能自动将 MessageHeaders.ID 映射到 AmqpHeaders.MESSAGE_ID?

这不是一个坏主意,至少作为一种选择。随意打开一个“改进” JIRA 问题

Spring AMQPAbstractMessageConverter具有createMessageIds生成消息 id 标头的属性;这独立于 Spring Integration。

所以通常,如果你想要一个消息 id 标头,你会在出站适配器的RabbitTemplate.

于 2018-05-26T18:12:51.217 回答