问题标签 [spring-rabbit]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
3973 浏览

rabbitmq - 如何根据条件限制并发消息消耗

场景(我已经简化了事情):

  • 许多最终用户可以从前端 Web 应用程序(生产者)开始工作(繁重的工作,例如渲染大 PDF)。
  • 作业被发送到单个持久 RabbitMQ 队列。
  • 许多工作应用程序(消费者)处理这些作业并将结果写回到数据存储中。

这种相当标准的模式运行良好。

问题:如果一个用户在同一分钟内启动了 10 个作业,并且在一天中的那个时间只有 10 个工作应用程序启动,那么这个最终用户实际上是在为自己接管所有的计算时间。

问题:如何确保每个最终用户在任何时候都只处理一个作业?(奖励:不得限制某些最终用户(例如管理员))

此外,我不希望前端应用程序阻止最终用户启动并发作业。我只希望最终用户一次完成一项并发作业。

解决方案?:我应该为每个最终用户动态创建一个自动删除的独占队列吗?如果是,我如何告诉工作应用程序开始使用此队列?如何确保一个(并且只有一个)工作人员会从此队列中消费?

0 投票
1 回答
3011 浏览

spring-mvc - spring REST消息总线通信

我正在寻找可以帮助我将 spring REST Web 服务与消息总线(RabbitMQ)集成的 spring 模块。REST Web 服务充当来自客户端的 AMQP 消息的消费者。每当通过总线发送消息时,它就是一条 AMQP 消息,并且要使其与 REST 一起使用,必须将其转换为 REST 调用。有谁知道使其工作的现有解决方案?

0 投票
1 回答
490 浏览

java - Spring AMQP 异常

这是我在stackoverflow中的第一个问题。我正在从事一个涉及 Spring AMQP 的项目。对于开发,我找到了本教程:Spring AMQP 示例

当我运行 ListenerContainer 类时,我得到了这个异常:

  1. 原因:org.springframework.amqp.AmqpIllegalStateException:未指定默认侦听器方法:为“defaultListenerMethod”属性指定非空值或覆盖“getListenerMethodName”方法。

我一直在寻找解决方案,但我无法解决。

如果有人可以帮助我,我将不胜感激。

0 投票
1 回答
1075 浏览

rabbitmq - java.lang.ClassCastException:JedisConnectionFactory 无法转换为 ConnectionFactory

我不确定为什么方法“connectionFactory”返回 JedisConnecionFactory 的一个实例,它不应该是 org.springframework.amqp.rabbit.connection.ConnectionFactory 吗?

“new RabbitTemplate(connectionFactory())”行出现以下异常(参见下面的代码)

@配置类

0 投票
1 回答
548 浏览

java - Spring-AMQP Transactionnal 发布无异常

我正在尝试将 Transactionnal RabbitMQ 通道与 Spring-AMQP 一起使用,但我想真正吞下异常来记录它们并能够恢复它们。

使用 channelTransacted=true 会强制 Channel 也加入当前 transactionManager(在我的情况下为 Hibernate),这会导致提交异常被重新抛出 @Transactionnal 边界,导致上层失败而无法捕获并记录它。

我还尝试手动将发布附加到事务,以便仅在提交成功后执行:

以这种方式使用:

但在那种情况下,我不能使用 channelTransacted=true 因为它会将 registeringSynchronization 嵌套在另一个 registeringSynchronization 中并且根本无法调用......

有没有办法做到这一点?

更新:理想情况下,我想覆盖 ConnectionFactoryUtils 类中使用的 RabbitResourceSynchronization,但它是一个没有工厂实例化的私有类

0 投票
1 回答
3548 浏览

java - RabbitMQ 通道最佳实践

我正在创建一个 REST api 来向 RabbitMQ 发送消息,并试图了解创建/关闭通道的最佳实践。我正在使用 RabbitMQ Java 客户端 API。

目前我有一堂课RabbitMQPublisherConnection,我在其中注入 RabbitMQ 连接。然后这个类被弹簧注入另一个类RabbitMQPublisherChannel。此类具有以下功能来创建通道:

现在我有第三堂课RabbitMQPublisher,我在那里注射RabbitMQPublisherChannel课程。我的应用程序上下文如下所示:

该类RabbitMQPublisher具有向 RabbitMQ 发布消息的功能:

这个应用程序是通过 tomcat 运行的,我注意到 AppDynamics 关闭通道花费了发布消息总时间的 47%。当我删除关闭通道的调用时,我节省了这 47% 的时间,这就像 32 毫秒,但是我在我的 RabbitMQ 管理控制台中注意到该连接的通道数量一直在增加。

所以我的问题是——

  1. 假设tomcat每秒会收到多个请求,这是在每次发布后打开和关闭频道的好习惯吗?
  2. 这是一个在多个线程之间共享通道池的好习惯(RabbitMQ 推荐但也说Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.)这是否意味着为每个线程创建一个新通道?
  3. 不关闭通道并通过空闲的 RabbitMQ http api 清理通道是否是一个好习惯。(请不要推荐这个)?
  4. 节省 32ms 值得吗?

谢谢

0 投票
1 回答
783 浏览

spring - 春季集成rabbitmq集群应用程序中所有订阅者未收到消息

我正在使用 Spring Integration、Websocket 和 RabbitMQ 服务器开发一个演示聊天应用程序。当我在单个服务器上执行应用程序时,它工作正常。

生产者发送的所有消息都被消费者接收。但是,当我在集群环境中运行它时,消息会在服务器上随机接收,而不是由所有服务器接收。

我不知道我的代码是否有问题,或者是导致它的配置。

我试图通过记录器检查它。记录器向我显示消息已成功发送,但并非所有服务器都接收到,而是仅由一台服务器接收。

以下是我与配置一起使用的类。

聊天控制器.java

}

RandomDataGenerator.java

}

webapp-config.xml

代理服务器配置

以下是示例应用程序的链接,您可以测试并尝试问题的原因:

演示应用

0 投票
1 回答
2670 浏览

java - spring-amqp 具有不同routingKey的多个队列

我最近开始学习 Spring 和 spring-amqp,所以这个问题可能看起来很基础,所以请原谅。

我有多个队列,它们位于不同的主机上,并且具有不同的 QueueName、RoutingKey、vhost、用户、密码。我正在为这些队列编写发布逻辑,但无法决定是否应该为每个队列设置一个配置类,或者是否可以在 XML 中完成。

创建一个类以获取有关队列的所有信息(主机、虚拟主机、用户名等)的方法工作正常,如本示例中所述。我创建了一个 @Configuration 类并为该队列定义了所有 bean。但是我需要做

所以我的要求是:

  1. 由于我有许多队列需要在应用程序启动时实例化,因此一旦 tomcat 启动到队列/兔子集群的连接/通道,就应该建立。
  2. 然后,一旦我的应用程序收到 POST 请求,我需要根据 POST 参数将消息发布到其中一个队列。

所以对于每个队列我总是需要做:

或者有没有办法让 Spring 加载我所有的 Queue 配置类并使用如下对象:

  1. 那么我怎样才能得到确切队列的 amqpTemplate 而不是new AnnotationConfigApplicationContext()每次都做呢?
  2. 每次请求到达我的服务时执行 new AnnotationConfigApplicationContext 有什么害处?[我猜为每个请求创建一个新对象不是一个好主意]
0 投票
0 回答
659 浏览

rabbitmq - RabbitMQ - 高可用性队列

我有一个 RabbitMQ 集群,它有两个节点和一个监听器(两者都连接),它声明了一个队列(与主题交换绑定)。队列被声明(显然在节点 1)并复制到节点 2(如预期的那样)。到目前为止,一切都很好。

但是当节点 1 出现故障时,队列从节点 2 中删除,使我的侦听器崩溃。这是队列参数:

  • 排他=“假”
  • 耐用=“真”
  • 自动删除=“假”

显然,这似乎不是正确的高可用性行为。问题是如何创建这个队列并在至少 1 个节点启动时保持它可用(无论哪个节点)?

0 投票
0 回答
519 浏览

spring-amqp - Java 8 支持 Spring AMQP 中的匿名队列吗?

匿名队列的使用似乎在 Java 8 中被打破了。使用 Java 7 运行正常。问题似乎出在AmqpAdmin.declareQueue(). 此调用导致连接异常。我有一个项目在这里演示它。

环境:

  • macOS 优胜美地
  • Java 1.8.0_20
  • Spring AMQP 1.4.3(为方便起见加上 Spring Boot)

** 更新 **

似乎连接localhost是 Java 8 的问题 - 当我覆盖默认设置ConnectionFactory并创建自己的并明确将主机设置127.0.0.1为连接被拒绝时,错误消失了。与 Java 的处理必须有一些区别,InetSocketAddress因为异常发生在 Spring AMQP 的FrameHandlerFactory第 32 行,它尝试使用主机名(localhost之前的)创建连接。我用更改更新了 github 项目。