问题标签 [spring-integration-amqp]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
899 浏览

spring-integration - SimpleMessageListenerContainer 关闭超时

我正在使用 spring-rabbit-1.7.3.RELEASE.jar

我在我的 xml 中使用 shutdownTimeout 参数定义了一个 SimpleMessageListenerContainer。

当我的服务关闭并且“aQueue”中仍有消息时,我希望 shutdownTimeout 将允许消息得到处理。但这似乎不会发生。

经过进一步调查,我发现 SimpleMessageListenerContainer 中定义的 await() 方法总是返回 true。

我想了解 await 的逻辑是如何工作的,它是如何获取锁的,以及我需要什么额外的配置才能使代码正常工作。

0 投票
1 回答
1018 浏览

spring - 在 Spring 集成中使用 RPC 的 RabbitMQ 示例

在搜索了实现它的不同方法后,我被卡住了。我正在寻找的是通过 Spring Integration实现这个示例( https://www.rabbitmq.com/tutorials/tutorial-six-spring-amqp.html )。我发现了有趣的帖子(Spring 与 Rabbit AMQP 集成,用于“客户端发送消息 - > 服务器接收并在返回队列上返回消息 - > 客户端获取相关消息”)但没有帮助我解决我的需要。我的案例工厂是一个系统,其中客户端调用“convertSendAndReceive”方法并且服务器(基于 Spring Integration)将响应。

谢谢

0 投票
1 回答
527 浏览

spring-integration - spring 集成 amqp 侦听器已处理

当消息侦听器容器上的 channelTransacted = true 时,确认模式.AUTO 和确认模式.手动有什么区别?

我知道当我想使用带有 channelTransacted=false 的 acknowledgeMode.MANUAL 时,侦听器必须通过调用 Channel.basicAck() 来确认消息。当侦听器容器具有 channelTransacted = true 时,是否不需要使用 acknowledgeMode.MANUAL?因为当我将 acknowledgeMode.MANUAL 和 channelTransacted=true 与调用 Channel.basicAck() 结合起来时,我得到了异常。

例外

0 投票
0 回答
77 浏览

spring-cloud-stream - 了解Spring Cloud Stream+AMQP中的上下文切换

我们在项目中使用spring-integration-amqp-4.3.10.RELEASE、spring-cloud-stream-1.3.0.RELEASE 和 spring-cloud-starter-stream-rabbit。我们试图通过以下方式优雅地关闭我们的微服务已shutdown-timeout放置在我们的入站通道适配器中。

场景

  1. 我们只能为没有 spring-cloud-stream 的微服务实现优雅关闭。

    • 我们分析它在同一个侦听器线程中运行,因此它按预期终止。
  2. 当我们尝试终止微服务时(具有 spring-cloud-stream + AMQP)

    • 我们不确定这里是否发生了线程转移,如果发生了,在这种情况下如何实现关闭。

你能建议我们克服第二种情况的方法吗?

0 投票
1 回答
477 浏览

spring-integration - 上下文中的并发消费者到弹簧集成

在 Spring 集成应用程序中,我使用并发消费者一次消费和处理多条消息。在我的应用程序中,我将所有 bean 配置为一个单例。我假设我是否要通过使用并发消费者的并行化处理,多条消息输入到相同的集成组件中。它会导致两个对象之间的数据冲突吗?

0 投票
1 回答
2134 浏览

java - Spring 集成 Java DSL - 动态创建 IntegrationFlows

我正在使用 Spring Boot 1.5.13.RELEASE 和 Spring Integration 4.3.16.RELEASE 开发应用程序。

我对 Spring Integration 很陌生,遇到了一个问题。

所以基本的想法是,在一些外部触发器(可能是和 HTTP 调用)上,我需要创建一个 IntegrationFlow,它将使用来自 rabbitMQ 队列的消息,使用它们做一些工作,然后(可能)生成到另一个 rabbitMQ 端点。

现在这应该发生很多次,所以我将不得不创建多个集成流。

我正在使用IntegrationFlowContext来注册每个 IntegrationFlow,如下所示:

我必须澄清这可以同时发生,同时创建多个集成流。

所以每次我尝试创建一个集成流时,我的“源”是一个看起来像这样的网关:

它不是@Bean(还),但我希望它在每个 IntegrationFlow 注册时都能注册。

我的“目标”是一个 AmqpOutBoundAdapter,如下所示:

现在这个已经是一个bean,并且每次我尝试创建一个流时都会被注入。

我的流程看起来像这样:

我的问题是,当应用程序启动正常并创建第一个 IntegrationFlows 等时。稍后,我遇到了这种异常:

java.lang.IllegalStateException:无法在bean名称“org.springframework.integration.transformer.MessageTransformingHandler#872”下注册对象[org.springframework.integration.transformer.MessageTransformingHandler#872]:已经有对象[org.springframework.integration .transformer.MessageTransformingHandler#872] 绑定

我什至尝试为每个使用的组件设置一个 id,它应该用作 beanName ,如下所示:

但是,即使 .filter 等组件的 bean 名称问题得到了解决,我仍然得到关于 MessageTransformingHandler 的相同异常。


更新

我没有提到这样一个事实,即一旦每个IntegrationFlow都完成了它的工作,它就会被IntegrationFlowContext这样删除:

因此,似乎(某种)起作用的是通过使用相同的对象作为锁来同步流注册块和流删除块。

所以我负责注册和删除流的类看起来像这样:

我现在的问题是这种锁对应用程序来说有点重,因为我得到了很多:

这并没有我想的那么快。

但我猜这是意料之中的,因为每次线程获取锁时,注册流及其所有组件或注销流及其所有组件都需要一些时间。

0 投票
1 回答
618 浏览

spring - 为什么在 Spring Integration 中将 ID 和 TIMESTAMP 声明为瞬态标头?

我正在尝试通过 Spring Integration 的 AMQP 输入/输出适配器发送/接收消息,但我遇到了这个问题

在这里找到 Gary 的答案后,我开始调查我的应用是否正确设置了消息 ID。事实上,它在这里被自动处理了。

制作人是这样的。

我故意发送错误消息,在消费者端我看到它的消息转换器在这里失败。

之后,消息会重新排队并无休止地重新处理。

在调试这个问题时,我注意到发送和接收消息的 ID 总是不同的。

在进一步调试此事件后,我了解到 Spring 的核心消息传递框架提供的标准 ID 字段被标记为瞬态,并且瞬态标头永远不会被映射

问题?

  • 为什么框架不能自动映射MessageHeaders.IDAmqpHeaders.MESSAGE_ID
  • 我应该将自定义标头字段作为 ID 并实施MessageKeyGenerator吗?
  • 顺便说一句,我将如何使用新的 Java DSL 来做到这一点?

非常感谢!

干杯,拉斯洛


更新:在无限循环中重新处理失败消息的原因是由其他原因引起的。

defaultRequeueRejected(false)通过添加到 AMQP 入站适配器的侦听器容器配置,我设法解决了这个问题。

0 投票
1 回答
603 浏览

rabbitmq - RabbitMQ 重试最大尝试次数结束后如何触发功能?(Spring 集成 - RabbitMQ 监听器)

我想在 RabbitMQ 侦听器重试结束后触发一封电子邮件,如果处理过程失败仍然如此。

重试逻辑正在使用以下代码。但是一旦最大重试尝试结束,如何触发功能(电子邮件触发)。

添加CustomeRecover的代码

0 投票
1 回答
226 浏览

spring-integration - Spring 集成是否支持 Azure 服务总线?

我需要使用 Azure 服务总线进行消息传递,并想尝试 Spring Integration AMQP 支持。我找不到任何相关的例子或任何暗示这是可能的。

0 投票
1 回答
854 浏览

rabbitmq - 当第二次调用启用发布确认时,Spring amqp 抛出监听器未注册

我一直在尝试让发布者确认打开的 Spring RabbitMQ 发布者。第一次调用正常并返回确认回调。从第二次通话开始,它失败并显示以下错误消息

这就是我的配置的样子

我自动装配rabbittemplate并使用发布者确认回调调用它,如下所示

经过大量调试后,我发现它失败了,因为模板没有注册为新实例中的侦听器,这些新实例PublisherCallbackChannelImpl是由第一个之后的后续调用创建的。我有点不知道这是配置问题还是发布者确认逻辑的问题。对此的任何见解都将非常有帮助。谢谢

编辑我正在使用 spring-amqp 版本 2.0.3-RELEASE 和 spring-messaging 版本 5.0.6-RELEASE

Edit2:第一次和第二次调用的调试日志 https://gist.github.com/jissjanardhanan/cbad51ba77fad3eda484d7c33c0b1517