问题标签 [spring-integration-amqp]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
709 浏览

rabbitmq - 当 RabbitMQ 交换不存在时如何处理错误(并且消息是通过消息传递网关接口发送的)

我想知道在以下情况下处理错误的规范方法是什么(代码是一个最小的工作示例):

  • 消息通过消息网关发送,该网关定义了它defaultRequestChannel的方法和@Gateway方法:
  • 从通道读取消息并通过 AMQP 出站适配器发送:
  • RabbitMQ 配置是骨架:

我通常包含一个 bean 来定义我所依赖的 RabbitMQ 配置(交换、队列和绑定),它实际上工作正常。但是在测试失败场景时,我发现了一种我不知道如何正确处理使用 Spring Integration 的情况。步骤是:

  • 删除配置 RabbitMQ 的 bean
  • 针对未配置的 vanilla RabbitMQ 实例运行流程。

我期望的是:

  • 无法传递消息,因为找不到交换。
  • 要么我找到某种方法从调用者线程上的消息传递网关获取异常。
  • 要么我找到一些方法来拦截这个错误。

我发现:

  • 无法传递消息,因为找不到交换,并且确实每次@Gateway调用该方法时都会记录此错误消息。
  • 网关没有失败,我也没有找到配置它的方法(例如:向throws接口方法添加子句,配置事务通道,设置wait-for-confirm和 a confirm-timeout)。
  • 我还没有找到其他方法来捕获该CachingConectionFactory错误(例如:配置事务通道)。
  • errorChannel我还没有找到在另一个通道(在网关上指定)或 Spring Integration 的默认值中捕获错误消息的方法errorChannel

我知道这种故障可能不会被消息传递网关向上游传播,它的工作是将调用者与消息传递 API 隔离开来,但我绝对希望这样的错误是可以拦截的。

你能指出我正确的方向吗?

谢谢你。

0 投票
1 回答
79 浏览

spring-integration - AMQP Inbound 和 JMS Outbound 之间的全局事务

我们需要在 RabbitMQ 和 MQSeries 之间传输消息。

在此处输入图像描述 为此,我们使用下面的配置。

我们要确认 RabbitMQ 队列中的消息,以防 MQSeries 中的写入良好。为此,我们channel-transacted在 amqp 入站和session-transactedjms 出站上使用该属性。

这是正确的方法吗?

我们如何测试消息在 MQSeries 队列中写入良好(蓝色箭头)但在向 RabbitMQ 确认期间发生错误(绿色箭头)的场景?那么是否可以在 MQSeries 上回滚?并使用来自 RabbitMQ 的此消息重试。

谢谢你的帮助。

0 投票
1 回答
294 浏览

spring-integration - SimpleMessageListenerContainer - AMQP 接收消息上的 ClassNotFoundException 警告

我刚刚将我的应用程序更新到版本spring-integration-amqp-5.2.4.RELEASE,当我从队列中收到消息时一切正常,但我注意到课堂上的警告java.lang.ClassNotFoundExceptionDefaultAmqpHeaderMapper我使用自定义DefaultJackson2JavaTypeMapperIdClassMapping. 如何避免跟踪警告?

日志警告

0 投票
1 回答
73 浏览

java - 是否可以在 Spring Integration 中使 MessageHeaders.ID 不是瞬态的?

我正在尝试通过 Spring Cloud Stream Source 发送消息。接收消息的应用程序需要存在id标头。我无法更改此要求,因此我需要找到一种方法来确保将id标头映射到我的传出消息。

我注意到id标题被映射到一个message_id属性。经过一番挖掘,我意识到这是因为标头在这里id被声明为瞬态。

有没有办法确保id在发送消息时标头仍然存在?

0 投票
1 回答
347 浏览

rabbitmq - RabbitTemplate 的 setChannelTransacted 标志导致消息未传递到队列

鉴于我有 AMQP 匿名队列和扇出交换的应用程序:

和 Spring 集成流程:

我使用出站适配器:

我可以在日志中看到:

但消息未传递到绑定cache.update.fanout交换的队列。

当我rabbitTemplate.setChannelTransacted(false);在出站适配器中设置时,我可以在日志中看到:

并且消息被传递到队列。

为什么在第一种情况下没有传递消息?

为什么 RabbitTemplate 不指示某些内容?

0 投票
1 回答
51 浏览

spring - Spring 集成 AMQP

我刚刚开始学习 spring-integration 我想在队列上接收消息并并行执行 2 个步骤:步骤 1 -> 使用 bean 处理它步骤 2 -> 转换并将其发送到另一个队列。就像是 :

我错过了什么?第一个句柄之后的动作没有被执行。我想我没有正确理解这部分。另外我怎样才能并行执行这两个步骤?

0 投票
1 回答
35 浏览

spring - Spring Integration 回复 Publisher

我使用 DSL 在 Spring Integration 中实现了 2 个流程:

  • REST -> AMQP -> 消费者 -> 服务
  • AMQP -> 消费者 -> 服务

第一个流程只是一个 HTTP 消息发布器,用于无法直接发布到 AMQP 的客户端。消息处理在Service中失败时,如何让发布者知道?

我正在使用DirectChannel查看Publisher ConfirmsService Acks模式,以便Publisher可以同步接收错误消息,如果我理解正确的话。但是,这将阻止发布者,直到服务返回(或抛出异常)。Spring Integration 中有哪些选项(因为它是基于 EIP 的)来处理应该通知发布者消息处理失败而不被阻止的情况?这也是一个设计问题。

0 投票
1 回答
143 浏览

rabbitmq - 将无效格式的 XML 消息停放在 AMQP 停车场队列中

鉴于我有 IntegrationFlow

deathCheckHandler在哪里

DeathCheckHandler处理在 AMQP 队列上设置的死信。

如何以不正确的格式(即MarshallingMessageConverterthrows时)停放 XML 消息UnmarshallingFailureException

我想以类似的方式停放它DeathCheckHandler#parkMessage

应该是可能的ConditionalRejectingErrorHandler,但我不知道如何。

0 投票
2 回答
217 浏览

spring-integration - 告诉 Jackson2JsonMessageConverter 在 Amqp.inboundAdapter 中使用我自己的类

鉴于我有IntegrationFlowAMQP 队列的入站适配器:

当一些外部系统发送带有 JSON 正文的消息时,我发现他们使用__TypeId__=THEIR_INTERNAL_CLASS.

我想将 JSON 正文映射到我自己的类。

目前,它ClassCastException由于THEIR_INTERNAL_CLASS不可用而失败。

我怎么知道Jackson2JsonMessageConverter使用我自己的课程?

0 投票
1 回答
39 浏览

spring-integration - 在 AMQP 入站适配器中过滤掉 AMQP 消息的标头

鉴于我有IntegrationFlow

我想将HeaderFilter应用于 AMQP 入站适配器,但似乎我只能在管道的稍后部分执行此操作。

是否可以过滤入站适配器中的标头?