问题标签 [spring-integration-amqp]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
318 浏览

rabbitmq - 如何配置与多个 RabbitMQ 节点的连接?

我可以通过两种方式配置 RabbitMQ 客户端连接:

  1. 逗号分隔的 IP 地址列表
  2. 具有多个 IP 地址的 DNS 记录的主机名

当连接的节点失败时,RabbitMQ 客户端会连接到另一个节点吗?

在这两种情况下都会这样做吗?

0 投票
1 回答
678 浏览

spring-integration - 如何将 AWS SQS 入站适配器连接到 AMQP 支持的消息通道?

我已经通过 Java 使用直接通道成功地为 AWS SQS 配置了入站通道适配器。该项目使用 JDBC 和 RabbitMQ 与 SQS 的组合作为 3 个独立队列的入站流。我需要持久消息,所以我试图弄清楚如何利用 RabbitMQ。我对如何引用 AMQP 通道感到困惑。如何从 setOutPutChannel 引用 AMQP 消息通道?我的目标是仅在消息成功发布到 RabbitMQ 持久队列时才将消息从 SQS 中删除。

0 投票
1 回答
67 浏览

spring-integration - 消息传递到 QueueChannel 后如何处理错误?

我有 10 个 rabbitMQ 队列,分别称为 event.q.0、event.q.2、<...>、event.q.9。这些队列中的每一个都接收从 event.consistent-hash 交换路由的消息。我想构建一个容错解决方案,它将按顺序使用特定事件的消息,因为排序很重要。为此,我设置了一个流程,用于侦听这些队列并根据事件 ID 将消息路由到特定的工作流程。工作流程基于队列通道工作,因此应保证具有特定 ID 的事件的 FIFO 顺序。我想出了以下设置:

现在,当我们说在 中发生错误时eventConsumerFlow,重试机制按预期工作,但是当在 中发生错误时workerFlow,重试不再起作用并且消息不会被发送到死信交换。我认为这是因为一旦将消息传递给 QueueChannel,它就会自动得到确认。我怎样才能使重试机制也起作用workerFlow,以便如果那里发生异常,它可以重试几次并在尝试用尽时向 DLX 发送消息?

0 投票
0 回答
200 浏览

spring-integration - Spring 集成 InboundAdapter/OutboundAdapter 流程

我正在尝试使用 Amqp.InboundAdapter 从 RabbitMQ 队列中读取数据,对其进行处理,然后将其推送到另一个队列中。

所以流程看起来像这样:

Amqp.InboundAdapter --> 服务激活器 --> Amqp.outboundAdapter

但问题是处理速度非常非常慢。rabbitmq 控制台的传输速率是 0.2/sec 或 1sec,非常慢。

什么可能导致这种缓慢?

请注意,我在 Amqp.OutboundAdapter 中使用 rabbitTemplate,它的 usePublisherConnection 设置为 True。

当我不使用 spring 集成时,rabbitmq 控制台中的消息速率大约为 1000 msg/sec。

0 投票
2 回答
447 浏览

spring-integration - 如何将对象“消息”传递给路由?

我创建了一个流,它使用来自 RabbitMQ 的消息,然后使用路由器按类型分配给适当的服务。服务中的方法需要参数Message<?>,因为我需要在那里使用标头。但是在这种方法中,我只收到带有 type 的消息有效负载,java.lang.String并且org.springframework.messaging.Message收到错误消息java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message

Payload 不适合我,因为我需要从消息中获取标头。

该方法的签名handleProcedure如下:

void handleProcedure(Message<?> message)

我希望Message在方法中获得标题handleProcedure,但现在我得到了异常。

0 投票
1 回答
275 浏览

spring-boot - Spring Integration ServiceActivator 是否适用于项目反应器类型?

我正在使用 Spring Integration 5.1.5(与 RabbitMQ 一起使用spring-integration-amqp)并且我在文档中读到 Spring Integration 支持项目反应器类型(我的意思是MonoFlux等等)。但我不能让它为ServiceActivator工作。我正在尝试这样的事情:

(请注意,我也在尝试myMethod使用,Flux<Message>但这是一个单独的问题)。

myMethod返回时我Mono<Void>收到此错误:

将方法更改为:

并手动订阅反应流将使其工作,但这显然不是我想要做的。我宁愿期望spring-integration框架来处理订阅。

这在 Spring Integration 中是否支持?如果是这样,我做错了什么?

0 投票
1 回答
386 浏览

spring - 入站通道适配器抛出“没有可用的输出通道或回复通道标头”错误

我想从 JMS 获取消息并将其作为 HTTP 请求发送,如果失败,请再次将其排队到 JMS。

我尝试使用入站消息适配器和消息驱动通道适配器,但它失败了,因为我得到“ChannelResolutionException:没有可用的输出通道或replyChannel 标头”异常,但因为我不想回复入站消息-adapter,不知道为什么要包含一个replyChannel 标头

我希望在正常运行中取得成功,但出现错误

此外,如果 http:outbound-gateway 响应为 null 或第 3 方服务关闭,则希望将消息再次排入 jms 队列。

0 投票
1 回答
737 浏览

spring - 找不到交换时的Spring AMQP出站适配器行为

我有一个连接 JMS 侦听器的 spring 集成代码(侦听 mq 系列本地队列)

并通过一个将消息转发给rabbitmq(不受我控制)

我试图保持这种事务性意味着如果 rabbitmq 没有收到消息,我想将其保留在 MQ 系列本地队列中。但是我注意到,如果 rabbitmq 配置中提到的交换不存在,我会在日志中看到这一行:

但是对于 mq 系列本地队列,我的消息消失了。

如果rabbitmq代理失败或交换不存在,我该怎么做才能重试发送消息?

感谢您的帮助,我的配置:

0 投票
1 回答
643 浏览

java - 如何使用spring集成java DSL向rabbitmq队列发送消息

我编写了简单的示例来从控制台读取文本并将其发送到 rabbitMq 服务器:

它看起来几乎是可行的解决方案,但我my_spring_integration_queue在 rabbitmq 管理控制台中找不到:

在此处输入图像描述

但我在其他选项卡上找不到与“my_spring_integration_queue”相关的任何内容。我在哪里可以找到它?

如果它不存在,我希望该应用程序将创建队列。我找不到发送到队列的方法,所以我使用了.routingKey方法。我也尝试过.exchangeName方法,但它导致:

附言

队列选项卡如下所示:

在此处输入图像描述

0 投票
0 回答
160 浏览

spring-integration - IntegrationFlow Amqp 通道适配器在句柄()中不起作用

我正在研究两个 RabbitMQ 消息代理之间的集成流程。

我的 IntegrationFlow 代码是:

入站适配器代码是:

我收到消息。消息正在发生变化。但是,转换后的消息没有到达另一个 RabbitMQ

rabbitTemplate 代码是:

我的 IntegrationFlow 出了什么问题?

谢谢,

马赫什