问题标签 [spring-integration-amqp]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring-integration-amqp - 异常时出站 Amqp 适配器回滚消息
我正在使用 spring 集成(带有入站/出站通道适配器)从
- JMS 到 AMQP
- AMQP 到 JMS
- AMQP 到 AMQP
这工作正常。
当目标队列已满时,我想停止该过程,并将消息发送回原始队列(从通道回滚消息)。
这适用于 JMS => AMQP 和 AMQP => JMS,但适用于 AMQP => AMQP。
以下代码工作正常(JMS => AMQP)
在日志文件中,消息很好回滚:
我有类似的 AMQP=> JMS 代码可以正常工作。
但是对于 AMQP => AMQP 我有一个问题,消息丢失并且进程没有停止:
日志文件:
然后重新启动消费者,而不是停止并回滚消息。
然后我的信息丢失了:-(
您对在这种特殊情况下进行预期操作有什么建议吗?我应该添加特定的治疗方法吗?
spring - 当从 amqp 转换消息发生错误时,弹簧整数 amqp 不会重试
弹簧整合 amqp 版本:5.0.11
目标:当发生致命异常时,消息将被丢弃。但不是致命的,消息将重新排队并执行重试策略。
但就我而言,我有一个自定义消息转换器,当我的转换发生一些非致命错误时,它将始终重新排队并且永远不会进入重试策略。
我尝试阅读代码,AmqpInboundChannelAdapter.Listener#onMessage
在重试之前,它转换消息,这意味着当消息转换发生一些错误时,它不会重试,将进入错误处理程序。
我的代码如下:
我该如何解决这个问题,谢谢。
spring-batch - Spring Batch Integration 消费 RabbitMq 消息
我目前正在使用 IntegrationFlow 在 RabbitMq 消息到达队列时触发作业执行。IntegrationFlow 的 AmqpInboundChannelAdapter 和作业的第一步的 ItemReader 都配置为从同一个队列中读取消息。
我遇到的问题是 IntegrationFlow 的 AmqpInboundChannelAdapter 读取 RabbitMQ 消息,然后 ItemReader 无法再找到该消息。可能是因为 IntegrationFlow 在 Job 启动之前确认了消息。
有没有办法阻止 IntegrationFlow 消费/确认消息,将其留在队列中,以便 ItemReader 可以作为预期工作?我尝试配置 AmqpInboundChannelAdapter 以重新排队消息,但这只会导致适配器重新读取它自己的消息的无限循环。
这个问题在一定程度上描述了我的问题,除了我没有做任何处理,我只是试图将 IntegrationFlow 用作 JobLaunching 触发器。因此,该解决方案似乎是一种反模式。
Spring Batch Integration - 传递数据 b/w 集成和批处理
任何帮助将不胜感激
spring-integration - 放慢入站通道
我们使用 spring 集成将消息从一个代理 (amqp | jms) 传输到另一个代理。
我们希望减慢来自输入通道的消息的消耗:<int-amqp: inbound-channel-adapter>
或者 <int-jms: message-driven-channel-adapter
(例如通过添加节奏)以免使远程队列饱和,给它处理消息的时间。
另一个约束是尊重输出队列上消息的顺序。
感谢您的建议
问候,
埃里克
logging - 在 Spring Integration 的 Amqp inboundAdapter 中使用 MarshallingMessageConverter 时记录 XML 有效负载
我有IntegrationFlow
它监听 AMQP 消息,我想记录 XML 消息有效负载。
当我使用MarshallingMessageConverter
in 时,在步骤Amqp.inboundAdapter()
中记录了已经反序列化的对象而不是 XML 有效负载.log()
我可以通过默认SimpleMessageConverter
和显式.transform()
步骤解决这个问题。
有没有办法记录原始 XML 有效负载并继续使用MarshallingMessageConverter
?
spring-integration - 检测 RabbitMq 连接上的异常
使用 spring 集成将消息从 RabbitMQ 传输到 MQ 效果很好。
如果我停止 RabbitMQ 服务器,那么我们的日志文件就会出错:
我们如何拦截这个异常?
ExceptionListener
添加jms时效果很好DefaultMessageListenerContainer
按照bean的配置:
编辑1
正如你建议我的那样,我使用这样的 bean 的定义:
随着ConnectionAmqpListener.java
这很好用,当我停止代理时,onShutDown
会调用该方法。
但是,如果我重新启动我的进程(代理关闭),我在日志文件中没有任何消息并且进程停止。
如果连接失败,您对如何获取信息有任何建议吗?
结束编辑1
谢谢你的帮助
问候,
埃里克
spring-integration - Spring Integration - http outboundAdapter 中的控制重试逻辑
我在 Spring Integration 5.4.4 中配置了一个从 AMQP 队列读取并写入 http 出站适配器的路由。例如,当我以编程方式为 http 出站适配器声明错误的 http 主机名(原因 java.net.UnknownHostException)时,我无法控制重试。
即使我在 amqpInboundAdapter 中配置了 RetryTemplate 逻辑,这似乎会生成无限重试(RabbitMQ 容器上未确认消息)。
我的目标应该是:在http出站适配器出错的情况下将消息重新排队N次,否则丢弃消息并且不要再次重新排队。
代码在这里:
Spring集成路线
AmqpInboundChannelAdapterSMLCSpec
有任何想法吗?
非常感谢
spring - 使用 Spring 集成的异步 RabbitMQ 通信
我有两个使用 RabbitMQ 进行通信的 Spring Boot 服务。Service1 向 Service2 发送会话创建请求。Service2 处理请求并应返回响应。Service1 应该处理响应。
Service1 请求会话的方法:
在 Service1 上,我定义了 AsyncOutboundGateway,例如:
在 Service2 上,我有接收这些消息的流程:
Service2 处理那里的请求:
问题是 Service2 立即将 ListenableFuture 作为消息负载返回给 Service1,而不是等待未来的结果并发回结果。
如果我通过将 async 参数设置为 true 来正确理解文档文档@ServiceActivator
,则应该返回成功的结果,如果出现异常,将使用错误通道。
可能我误解了文档,因此我需要在 Service2 流中解压缩 ListenableFuture,然后再将其作为响应返回,但我不确定如何实现。
我尝试了一些publishSubscribeChannel
但没有太多运气的东西。
spring-batch - 如何在 Spring 集成中使用控制总线从另一个 JVM 启动/停止集成组件
我们正在使用 spring 集成/spring 批处理作业在连接到公共 ActiveMQ 队列的多个 jboss 节点中运行作业。对于监听队列/通道的入站网关,我们已将自动启动配置为 false。自动启动的问题是,如果入站网关具有更多并发消费者属性,那么即使在需要这些消费者线程的作业频率非常少的情况下,在服务器启动期间启动的所有节点中也会有这么多数量的消费者线程处于活动状态。在这种情况下,春季批处理作业在 4 个节点之一中触发。我们可以在作业启动时利用控制总线启动入站网关组件,它适用于同一个JVM。但是在工作节点中(运行在不同的 JVM 中),
spring-integration - Spring集成amqp重复添加标头
很长一段时间以来,我们一直在使用 spring 集成核心和 spring 集成 amqp 以及 rabbitm-mq。我们的服务还通过使用 x-death 标头和 amaqp-expiration 标头使用死字机制。它曾经可以正常工作,直到我们决定升级 spring-integration 的版本。
以前的版本:5.0.6.RELEASE
新版本:5.2.4.RELEASE
以前版本中的 Rabbit mq 标头
新版本中的兔子 mq 标头
基本上消息头的大小变得如此之大以至于超出了限制。在 5.0.6 版本中不会发生
更新 :
我们放置了一个自定义键,例如 original-expiration,它的值来自 x-death 标头
然后我们使用 header-enricher 来丰富 amqp-expiration 标头