问题标签 [spring-integration-amqp]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
224 浏览

spring-integration - 为什么带有 Jackson2JsonMessageConverter 的 AmqpChannelFactoryBean 不存储类型?

我正在尝试将 Spring 与 RabbitMQ 集成,使用 RabbitMQ 支持的 Spring 集成通道。(由于某种原因,这似乎几乎没有记录,这是新的吗?)。

为此,我似乎可以使用 AmqpChannelFactoryBean 创建一个频道。要设置消息转换,我使用 Jackson2JsonMessageConverter。

当我使用带有 POJO 有效负载的 GenericMessage 时,它​​拒绝从 Java 反序列化它,主要是因为它不知道类型。我本来希望该类型会自动放在标题上,但标题上只有__TypeId__=org.springframework.messaging.support.GenericMessage.

在 Spring Boot 中,我的配置类如下所示:

发送是这样完成的:

如果我设置自己的类映射器,事情就会正常工作。但是如果我设置这样的东西,我将不得不使用许多 MessageConverters。例如

我用错了吗?我错过了一些配置吗?还有其他建议吗?

谢谢!!:)

注意:看更多的东西,我猜“Spring 集成”的方式是在每一侧添加一个 Spring 集成 JSON 转换器,这意味着每个 RabbitMQ 队列还添加两个额外的直接通道?这对我来说感觉不对,因为那时我有三倍的通道(6!用于输入/输出),但也许这就是应该使用框架的方式?将所有简单的步骤与直接渠道相结合?(在这种情况下,我是否保留了 RabbitMQ 通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道如何工作所固有的?)

我还注意到现在有一​​个 Spring-integration MessageConverter 和一个 Spring-amqp MessageConverter。后者是我用过的。另一个会按照我想要的方式工作吗?快速浏览一下代码表明它没有将对象类型存储在消息头中?

0 投票
1 回答
1302 浏览

java - 如何在 Spring Integration 中动态添加队列以进行交换

我在integrationcontext.xml中有以下交换

我需要能够根据来自数据库的以下对象的channelName值动态地将队列添加到交换中,当有人添加新通道时我应该能够更新:

0 投票
1 回答
119 浏览

java - spring 消息传递 xml 配置的等效 Java 配置

我正在尝试使用rabbitMQ实现spring批量远程分块。我关注这个链接

什么是等效的 java 配置

0 投票
1 回答
203 浏览

java - 在 IntegrationFlow 服务激活器方法成功返回之前不确认 RabbitMQ 消息?

我有一个这样定义的集成流程:

其中的serviceActivatorBean定义如下:

并且requestHandlerRetryAdviceForIntegrationFlow()是这样定义的:

我们面临的问题是当events服务激活器中的集合包含 2 个或更多事件并且由于某种原因处理 myMethod失败和服务器崩溃时。似乎发生的情况是,IntegrationFlow每次从 RabbitMQ 消费和确认一条消息,因此如果服务器在处理myMethod除最后一个事件之外的所有事件期间崩溃,则会丢失。这对我们来说既不好也不安全。我们可以做些什么来配置在服务激活器成功完成IntegrationFlow之前不确认任何消息?myMethod

0 投票
1 回答
739 浏览

java - Spring Integration: get all headers involved in an aggration and not just the last one

I have a Spring integration flow defined like this:

And a service activator defined like this:

What I'm trying to do is to change the myMethod to also take a list of particular headers associated with each message in the aggregation. In my case I'd like to get a hold of the AmqpHeaders.CHANNEL and AmqpHeaders.DELIVERY_TAG for each message so that I can perform an ACK or NACK for each message to RabbitMQ (note that I'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK's/NACK's after myMethod has executed).

I've tried for example this approach:

but here I only seem to get header values for the last message (i.e. channels and tags are always of size 1 even though there are multiple events in the events collection).

I've also tried changing Collection<MyEvent> to Collection<Message> (org.springframework.messaging.Message) in order to try to manually extract the headers but this fails with:

since the message has already been transformed by the message converter defined in the IntegrationFlow.

How can I achieve this?

0 投票
1 回答
391 浏览

java - 当消息数大于并发消费者数时,如何消费 Spring IntegrationFlow 中所需的所有消息?

我有一个这样定义的集成流程:

其目的是将几个以“批次”发送的相关消息分组。这是一个例子:

由于此处描述的原因,我们对 RabbitMQ 使用手动 ack:ing 以避免丢失消息。

集成流程适用于大小为 2 的批次,但一旦批次中有超过 2 条消息,我们就会遇到麻烦:

请注意,记录消息之间的时间大约为 2 秒(即我们已确认为groupTimeout)。

我怀疑这是因为 Spring 消耗了 2 条消息(不是自动 ack:ed),然后聚合等待第 3 条消息(因为batchSize在这种情况下是 3 条)。但是这条消息永远不会在 2 秒的窗口内被消费,因为只有两个并发消费者。

concurrentConsumers计数增加到 3 可以解决此问题特定问题。问题是我们不知道我们收到的批次的大小,它们可能非常大,可能大小为 50 左右。这意味着简单地增加concurrentConsumers不是一个可行的选择。

在 Spring 中处理此问题的适当方法是什么?

0 投票
3 回答
5514 浏览

spring-integration - @MessagingGateway 如何配置 Spring Cloud Stream MessageChannels?

我已经开发了异步 Spring Cloud Stream 服务,并且我正在尝试开发一个边缘服务,它使用 @MessagingGateway 来提供对本质上是异步的服务的同步访问。

我目前收到以下堆栈跟踪:

我的@MessagingGateway:

如果我通过@StreamListener 在回复通道上使用消息,它就可以正常工作:

在生产者方面,我正在配置requiredGroups以确保多个消费者可以处理消息,并且相应地,消费者具有匹配的group配置。

消费者:

制片人:

生产者端处理请求并发送响应的代码位:

我可以调试并看到请求被接收和处理,但是当响应发送到回复通道时,就会发生错误。

为了使@MessagingGateway 正常工作,我缺少哪些配置和/或代码?我知道我正在结合 Spring Integration 和 Spring Cloud Gateway,所以我不确定一起使用它们是否会导致问题。

0 投票
1 回答
126 浏览

java - RabbitMQ + Spring 集成。队列大小 1,仅在覆盖时删除

我想现在是否可以使用 RabbitMQ 和 Spring Integration 来实现这个想法:

  1. 一个队列,可容纳 1 条消息。
  2. 消费者会请求这个消息,如果它存在于队列中,它将被传递给他们,如果不存在,他们会得到一个空或错误。
  3. 此消息(如果存在于队列中)不会因为已经下载而被删除,只有当生产者将另一条新消息放入队列时才会被删除。

此致!

0 投票
1 回答
5317 浏览

spring - Spring集成错误“没有可用的输出通道或replyChannel标头”

我不知道为什么我会得到例外

它只是一个简单的 IntegrationFlow,但不确定我在下面的代码中缺少什么。

我试图接触 Spring Integration,但不知道为什么会出现这个异常。我要做的就是使用 an 从队列中读取inboundAdapter并将其记录到控制台。代码运行良好,但是当我将消息发布到队列时,我得到了这个异常。使用适配器时是否必须指定 areplyChannel或always ?output-channelAmqp

0 投票
1 回答
4922 浏览

spring-integration - 如何在 Spring Cloud Stream 中手动确认 RabbitMQ 消息?

@StreamListener对于基于流的服务,我希望当在 a 中调用的底层服务失败时消息保留在队列中。为此,我的理解是,这样做的唯一方法是配置spring.cloud.stream.bindings.channel_name.consumer.acknowledge-mode=MANUAL.

进行此配置更改后,我尝试将@Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag方法参数添加到我现有的@StreamListener实现中,如https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack中记录的那样。有了这段代码,我遇到了以下异常:

然后我发现了以下内容:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples,它显示了如何使用 Kafka 执行消息确认的示例,但我是目前使用的是 RabbitMQ 绑定。我们计划最终迁移到 Kafka,但是现在,我如何配置和编写解决方案来为成功处理的消息进行手动消息确认和手动消息拒绝,从而在遇到异常时将消息留在队列中。我目前在 Spring CloudEdgware.RELEASE和 Spring Cloud Stream上Ditmars.RELEASE

更新

现在我有以下配置:

我在服务启动时收到以下错误:

什么配置错误/我错过了什么?