问题标签 [node-amqplib]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
1025 浏览

node.js - RabbitMQ 重新连接逻辑不适用于 RabbitMQ 服务器的突然重启

我的节点应用程序和RabbitMQ服务器正在docker上运行。
该nodejs应用程序具有重新连接逻辑,当我通过killRabbitMQ容器中的命令优雅地重新启动RabbitMQ服务或sudo docker-compose restart -t 10 rabbitmq等待10秒后再向killRabbitMQ服务发送强制信号时,该逻辑非常有效。

但是当我强行重启 RabbitMQ 服务时,sudo docker-compose restart -t 0 rabbitmq重新连接逻辑不起作用。lsofnodejs 应用程序容器上的命令不显示任何连接。

即使在 RabbitMQ 服务强制突然重启的情况下,应该有什么正确的方法让它工作?

0 投票
1 回答
508 浏览

node.js - 如何从 channel.consume 中提取味精

我已成功使用来自 rabbitmq 的消息,如果我添加 console.log(msg),我可以看到该消息,但问题是我无法在 channel.consume 之外获取消息

我试图将它分配给变量,但它仍然没有工作

我期望代码在消费部分之外打印 messageString console.log(messageString);

0 投票
0 回答
1359 浏览

node.js - 如何以正确的方式将 rabbitMQ 实现到 node.js 微服务应用程序中?

问候 Stackoverflow。多年来我一直在使用 stackoverflow 来寻找答案,这是我第一次尝试自己提出问题。所以如果我做错了,请随时告诉我。

目前我正在开发一个基于微服务架构的数据分析系统。假设该系统将由十几个自给自足的微服务组成,这些微服务通过 RabbitMQ 相互通信。它们中的每一个都封装在一个 docker-container 中,整个系统在生产中由 docker-swarm 提供支持。

特别是每个微服务都是一个 node.js 应用程序和相关的数据库,与一些 ORM 接口连接。它的任务是以 CRUD 方式管理和服务数据,并根据包含的数据提供一些准备好的查询的结果。没什么特别的。

为了提供微服务-微服务通信,我假设使用amqplib。但是使用它的方式还不确定。

我目前的问题是如何以 OOP 的方式利用 amqplib 将微服务间通信网络与应用程序的对象相关功能链接起来?通过 OOP 方式,我的意思是从长远来看可以替换 amqplib(和 RabbitMQ 本身)而无需更改与数据相关的逻辑。

我真正寻找的是当前正在使用 AMQP 的微服务应用程序的示例。如果有人可以提供链接,我将非常感激。

我的问题的第二部分。基于事件驱动的主体构建微服务应用程序,并将消息从 RabbitMQ 传递到应用程序的主事件队列是否有意义?这样每个过程都将以相同的方式调用,尽管它是内部或外部事件。

至于单个微服务的抽象示例:假设我有一个事件服务和一个连接到该服务的侦听器:

另一方面,我有 RabbitMQ 连接,正在等待消息:

我目前的想法是仅解析此消息并将其转发给 eventService 并将其路由键用作事件的名称,如下所示:

但是 RPC 呢?我是否应该用另一种方法为他们进行另一次交流甚至是一个渠道,例如:

然后我的 User 类应该在每次创建新用户时触发 'users.user.create-response' 事件。这不是拐杖吗?

0 投票
1 回答
1334 浏览

javascript - 兔MQ。循环发布消息

我正在使用amqplib js 库。我有发布者(它循环发布消息)和少数工作人员。如果我发布 1000000 条消息,我会看到我的消息如何首先发送给 rabbit,然后我收到ack,并且只有在这条消息开始在 worker/workers 中消费之后。

据我了解,当我发送消息时,兔子无法向发布者发送ack。我对吗?我该如何解决这个问题?

我有主文件:

简单的发布者:

和简单的工人:

工作成果: 我不知道它是否重要,但我说。我使用这个 rabbit cluster,并在其中启用了一项策略: 10 000 条消息1 000 000 条消息

我还想,频道中有一些问题,我又添加了一个测试:

但结果是一样的: img2

0 投票
1 回答
1323 浏览

node.js - Node/RabbitMQ - 向 nodejs 路由发送消费者响应

我正在通过 RabbitMQ 处理 nodejs 请求。我的生产者通过 nodejs 路由接收请求并将它们发送给消费者,然后消费者通过从请求中接收到的数据在数据库中创建一个文档。这是我的路线

这是我的制片人课程

还有我的消费者

我想将从消费者创建的用户的 Json 文档发送到路由,以便客户端可以获得创建的用户作为响应。我怎样才能做到这一点,我做错了什么?

0 投票
1 回答
859 浏览

node.js - 节点 RabbitMQ 消费消息并为每条消息做一些事情

我想使用来自 rabbitmq 服务的消息,并且对于我收到的每条消息,我都想做一些事情(例如:将该消息放入数据库,处理消息并通过 RabbitMq 通过另一个队列发送回复)每条消息。

目前我的 RabbitMq 消费者代码如下:

0 投票
1 回答
1080 浏览

node.js - 通过 NodeJS 从 RabbitMQ 获取交换、绑定、队列的列表

有时需要通过 NodeJS 在 RabbitMQ 上重新创建绑定和队列。我需要以编程方式解除绑定、清除和删除它们。我可以保存我所做的并在以后恢复。我使用 amqplib。它混淆了它不提供可以列出它们的功能。http://www.squaremobius.net/amqp.node/channel_api.html 有可能吗?它不存在的原因是什么?

0 投票
1 回答
274 浏览

node.js - 如何在 Node.js 中构建一个类来抽象 RabbitMQ 和 amqplib 功能

我正在尝试构建一个小型库来抽象 amqplib 与 RabbitMQ 通信所需的一些样板。我正在使用 promises api 和 async/await 语法。我正在尝试使用一些方法构建一个类,以与其他几个服务器和客户端一起使用。我在网上搜索过,绝大多数例子都是直接的、小规模的教程。

这是我目前对messages.js的了解:

这是 setup.js 的代码:

我尝试发送消息时收到的错误是“TypeError:无法读取未定义的属性'sendToQueue'。” 显然通道属性没有被正确初始化。我将 async/await 包含在 try/catch 块中并得到相同的错误。

关于 Node.js 中的类/方法,我有什么遗漏吗?

我认为这与承诺的解决有关。当我将对 sendToQueue() 的调用移至 setupConnection() 方法时,将发送消息。

所以看来我需要找到一种方法让 send 方法等待 setup 方法的解析。

0 投票
0 回答
67 浏览

rabbitmq - Npm amqplib rabbitmq ackall() 与 ack()

我是rabbitmq的新手。我了解我需要确认消息才能从队列中删除已使用的消息。我的问题是我应该使用 ack() 还是 ackall()?从文档看来,ackall 正在消除队列中的所有消息,甚至是尚未使用的消息。那是对的吗?

0 投票
1 回答
3067 浏览

node.js - 在 nodejs 上使用 protobufjs 使用来自 .net 的 protobuf 消息时出现无效的线路类型和索引超出范围错误

我正在尝试在节点 js 上使用来自 RMQ 的 protobuf 消息。protobuf 消息是使用 C#.Net 上的 protobuf-net 创建的

因此,例如 c# 对象如下所示:

然后它被添加到 RMQ 中,我们在另一端使用具有相同对象的 .net 侦听器对其进行解码。

但是现在我们想从 nodejs 读取消息。为此,我在 nodejs 端使用 amqplib 和 protobuf-js。

我试图使用带有装饰器的对象来解码消息,如下所示:

像这样解码:

其中 ch 是 amqplib RMQ 通道。

但我总是遇到以下错误之一:

偏移 2 处的无效导线类型 7

偏移 2 处的无效导线类型 4

偏移 2 处的无效导线类型 6

超出范围的索引:237 + 10 > 237

ETC

我究竟做错了什么?

编辑:

看起来我没有考虑 MessageBase(PositionOpenNotification 继承的抽象)也是一个 ProtoContract 并且数据是用长度前缀序列化的事实。

所以最后这是有效的:

添加一个带有 PositionOpenNotification 对象的 MessageBase 对象:

然后在反序列化它时: