问题标签 [node-amqplib]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
157 浏览

node.js - 有一个消费者监听多个队列,但它应该在队列 B 之前消费队列 A 中的所有消息

我是 RabbitMQ 的新手,我仍在学习它是如何工作的。到目前为止,我设法创建了两段代码(使用amqplibNode.js 的包):将消息发送到队列 A 和队列 B 的发布者;以及一个消费队列 A 和队列 B 的消息的消费者。

但是,队列 A 应该比队列 B 具有更高的优先级,所以我想创建我的消费者,它会同时监听队列 A 和队列 B,但它会首先使用队列 A 中的所有消息,并且仅在之后队列 A 为空,它将消耗队列 B 中的消息。

是否有可能做到这一点?

0 投票
1 回答
375 浏览

node.js - RabbitMQ 数据在崩溃时丢失

我正在使用 RabbitMQ 来存储和检索数据。我提到了这篇文章。我已将durable标志设置为 true 并将noAck标志设置为 false(即使在消费后我也需要将消息存储在队列中)。

我创建了这些场景:

我在消费者关闭状态(非活动)的情况下更新了 3 次库存数据。然后我激活了消费者。它消耗了队列中的所有三个消息。[效果很好。]

现在我再次产生了三条消息(消费者再次处于非活动状态),然后我关闭了 rabbitmq 服务器。当我重新启动服务器并激活消费者时。它似乎没有消耗数据(队列中的消息是否已丢失?)

消费者 :

制片人:

他们不是很执着吗?如果服务器崩溃,队列中的所有消息都会永远消失?

服务器崩溃时如何检索队列中的数据?

提前致谢。

0 投票
1 回答
908 浏览

node.js - 如何在rabbitmq(rascal.js)上管理每个请求的发布连接

我在 node.js 应用程序上使用 Rascal.Js(它使用 amqplib)作为我的消息传递逻辑和 rabbitMq。

我在我的项目启动中使用了与他们的示例类似的东西,它创建了一个永久实例并“注册”我的所有订阅者并在它们到达队列时重定向消息(在后台)。

我的问题与出版商有关。有来自外部的 http 请求应该会触发我的发布者。用户单击各种创建按钮,这会导致某些操作流。在某些时候,它达到了我需要使用发布者的地步。

在这里,我不确定正确的方法。每次需要发布消息时都需要打开新连接吗?并在结束后关闭它?或者也许我应该以一种为所有发布者保持相同连接打开的方式来实现它?(实际上我不太确定如何以可以从我的应用程序的其他部分访问它的方式创建它)。

目前我正在使用以下内容:

当我需要发布消息时,我会在应用程序的不同部分使用此功能。我想只为所有端点添加 broker.shutdown() 但在发生错误后的某个时候,我收到一个关于关闭已经关闭的连接的异常,这让我担心关闭方法(这可能不是一个好的)。我认为这与此有关-我尝试这样做(注释代码),但我认为它在某些情况下效果不佳。如果一切正常,它会“成功”,然后我可以关闭它。但是有一次我遇到了错误而不是成功,当我尝试使用 broker.shutdown() 时,它给了我另一个导致应用程序崩溃的异常。我认为这与此有关 - https://github.com/squaremo/amqp.node/issues/111

我不确定解决此问题的最安全方法是什么?

编辑:

实际上现在我想起来了,这个异常可能与我试图关闭 catch{} 区域中的代理有关。我会继续调查。

0 投票
1 回答
1077 浏览

node.js - RabbitMQ 队列未被多个消费者服务并行消费

我正在研究 NodeJS 和 RabbitMQ,其中 -

  • 主 NodeJS 服务(一个实例)将数据推送到队列中。
  • 从属 NodeJS 服务(多个实例)使用数据并对其进行处理。
  • 正在使用默认交换。

从服务在 PM2 集群模式下运行,这意味着我有 8 个从服务运行的实例。

我的期望是,当主服务开始通过队列推送数据时,从服务应该开始异步消费它们。

例如,如果主服务正在通过队列推送 10 个作业,并且每个作业需要 5 秒才能完成,那么从属服务需要 50 秒才能完成该作业。

这完全违背了使用多个奴隶的目的,因为我通常希望奴隶一次接 8 个工作。

根据 RabbitMQ 仪表板,上述设置创建 -

  • 9 个连接(1 个主站 + 8 个从站)
  • 9通道(1主+8从)
  • 1 个队列

整个设置使用默认交换。

我的问题是——

为什么从站不能异步从队列中读取数据?

即使我设置noAcktrue队列中的下一个项目,在处理当前项目之前不会被拾取

我的意图是通过用户多个从属实例来扩大队列消耗率,但我认为我在这里遗漏了一些东西。

这是代码库 -

从机输出 -

如果您注意到,不同的从站正在以循环方式接收消息,但它们是同步工作的。

谢谢!

0 投票
1 回答
389 浏览

python - 使用 RabbitMQ 时如何同步 Node.js 和 Python 即在 worker 上运行任务,等待结果并将其发送到客户端

我有一个快速的 Node.js 应用程序和一个使用 python 的机器学习算法。例如,我正在使用 RabbitMQ 来集成 Node.Js 和 Python,并且它可以工作,我的意思是,这比使用带有 spawn 的库 child_process 更具性能。但是我在同步工人对他们各自请求的响应时遇到了麻烦。

我的代码类似于下面的示例(玩具示例)并基于这篇文章1。这个实现有两个主要问题,它向客户端(邮递员)发送错误的答案或者没有完成请求。

这段代码应该接受来自客户端的请求(图像和类型),将此任务放入队列(任务队列),等待工作人员完成其工作并将结果发送给客户端(正确的)。

此代码应从队列(任务队列)中获取任务,运行它们并将结果放入结果队列。

如何解决这些问题?

0 投票
1 回答
480 浏览

node.js - 使用 amqplib 设置连接名称

我需要为我的连接设置一个友好名称,如下所示,而不是“?” 在用于 NodeJS 的 amqplib 的 RabbitMQ 中:

在此处输入图像描述

我找到了 Java 和 Python 的例子,但这个库还没有。谢谢。

0 投票
1 回答
280 浏览

node.js - 在重新连接时使用 node-amqplib 取消订阅特定队列

问题:远程系统重新连接到多个节点 websocket 服务器,为每个系统创建/使用 RabbitMQ 中的专用队列。如果不存在活动连接,则应自动删除队列。Websocket 连接/断开事件处理程序是异步的,相当繁重,观察到的问题是断开事件处理程序在重新连接后完成,导致系统不一致。

主要问题是 RabbitMQ 队列 - 最初的解决方案是为每个连接创建唯一的队列并在断开连接时删除它们。显得很重。

第二种方法是为每个远程系统保留一个专用队列(任何连接的队列名称相同),问题是 assertQueue 为同一个队列添加了消费者。需要找到在不删除队列本身的情况下删除陈旧队列消费者的方法。

0 投票
1 回答
79 浏览

rabbitmq - 工人可以根据主题消费吗?

我知道您可以使用 将消息直接发送到队列channel.sendToQueue,这会产生任务和工作人员的情况,其中只有一个消费者将处理每个任务。

我也知道您可以使用channel.publish基于主题的交换,并且消息将根据路由键路由到队列。不过,据我了解,这将始终广播给任何匹配队列上的所有订阅者。

我基本上想使用基于主题的交换,但每个任务只有一个消费者处理。我已经阅读了文档,但我没有看到这样做的方法。

我的用例:

我在多个位置设置了微服务实例。可能有两个在加利福尼亚,三个在伦敦,一个在新加坡,等等。创建任务时,唯一重要的是它由给定位置的一个实例处理。

当然,我可以创建数百个名为“usa-90210”、“uk-ec1a”等的队列。看起来使用主题会更干净。考虑到使用通配符的能力,它也会更加灵活。

如果这不是 RabbitMQ 的功能,我也愿意接受其他想法或想法。

更新

根据 istepaniuk 的建议,我尝试创建两个工作人员,每个工作人员将自己的队列绑定到交换:

不幸的是,两个消费者仍在接收消息。

0 投票
1 回答
87 浏览

node.js - 调试库不能与 amqplib (rabbitmq) 一起使用

我们以两种“模式”运行我们的应用程序:

  1. 我们的 REST API (express js)
  2. 我们的后台处理器 (amqplib)

我们开始使用 nodemon 的 REST API 在debug下运行得很好,但是我们的后台处理器不能在 debug 下运行。

我们DEBUG=app:*在 .env 文件中声明,是的,我们在控制台日志中确实看到了它,但是由于某种原因,当我们执行以下操作时,在运行我们的后台处理器时没有任何报告。

我们确实看到 amqp 的依赖项之一称为bitsyntax使用调试。我想知道它是否可以关闭调试但在他们的代码中找不到任何这样做的东西。

我能做些什么来解决这个问题吗?

我们使用以下命令运行我们的后台处理器: NODE_ENV=development nodemon ./src/workers/index.ts

0 投票
0 回答
161 浏览

node.js - rabbitMQ 重启后使用 amqplib 重新连接 RabbitMQ

我有这个 rabbitmq 连接代码,我使用它连接到队列并使用它。

现在这工作正常,但只要我在终端上运行这个命令,

brew services restart rabbitmq

这基本上是重新启动rabbitMQ,我收到这个心跳错误,然后我无法再次连接到rabbitMQ。这种情况不断发生。我还尝试将 try catch 放入connect()函数中,但它没有被捕获。我得到的错误是:

有什么想法吗?如果有办法可以捕获此异常,那么我可以从那里再次调用 connect 。