问题标签 [rhea]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
118 浏览

openwhisk - 来自 Google Flatbuffers 的 OpenWhisk 和二进制数据

我们有一个模拟设备创建的数据,该设备使用 NanoMSG 与 Google FlatBuffers(二进制)的有效负载放在网络上。

我们希望使用 OpenWhisk 触发此数据的模式,并使用 Flatbuffer 编码的响应进行响应。

假设延迟和吞吐量在这里不是一个大问题。

我们可以采取哪种方法:

  1. 编写一个转发器,将 Flatbuffer 转换为 JSON(FB 有一个实用程序可以做到这一点),然后将数据放到 OpenWhisk 监听的 AMQP 总线上?(我们有熟悉 ​​AMQP 的人,但不熟悉 Kafka

  2. 尝试用 Kafka 做一些事情,这似乎(可能只是 IBM 版本)可以直接处理二进制 Flabuffers(可能仍然需要从 NanoMSG 到 Kafka 的 shim。例如

如何在 Bluemix 中从 IoT 平台调用 OpenWhisk 操作

https://medium.com/openwhisk/serverless-transformation-of-iot-data-in-motion-with-openwhisk-272e36117d6c

不确定我们是否仍然不需要 Flatbuffers JavaScript 反序列化器和序列化器来将 JavaScript 中的二进制 based64 数据转换为 JSON

  1. 学习 Kakfa,然后将 NanoMsg 负载(Flatbuffers 转换为 JSON)。

  2. 还有什么?

有人有这方面的直接经验吗?


更新

谢谢詹姆斯,这些都是现场链接。但它确实提出了一些次要问题:

  1. 如果数据在 Google FlatBuffers 模式中,使用 Kafka 二进制转换似乎没有任何优势,因为来自 base64 的 mux/demux 仍然需要在 javascript 层中完成。
  2. Kafka(以低延迟着称)正在对事件进行批处理,这有点令人不安。当一个人有物联网(传感器数据)需要在执行器(传感器->控制->执行器)的闭环中响应时,这确实会影响延迟,这是一种常见的机器人模型,这与我们的模型非常接近正在做。目前我们并没有推动延迟问题,但我可以看到我们需要低延迟的新兴案例。Kafka Whisk 提供者社区对此有何看法?
  3. 我一定遗漏了一些东西,但 AMQP 提供商说它正在使用 RHEA https://github.com/amqp/rhea#receiver。在编写用于处理传感器流数据的简单触发器/规则方面,这似乎满足了所有需求。为什么要使用 OpenWhisk?
0 投票
1 回答
98 浏览

amqp - 无法打开绑定到应用程序标头 x-match 表达式的接收器

我正在使用 rhea ( https://github.com/amqp/rhea ),一个 node.js 库来开发 AMQP 1.0 客户端。

我正在尝试使用x-match表达式而不是 JMS 表达式来调整https://github.com/amqp/rhea/tree/master/examples/selector示例。

目的是实现基于符合 AMQP 1.0 的代理(ActiveMQ、Qpid、...)的标头路由机制。

我在 recv.js 的相应部分尝试了这段代码:

从 Qpid Java 代理(版本 7.1.0)收到连接错误“预期值类型为 'Filter' 但得到 'String' amqp:decode-error”。

0 投票
1 回答
251 浏览

amqp - 如何使用 rhea 使用 amqp 在 azure 服务总线中实现重新交付延迟

我在 nodejs 应用程序中使用 rhea 使用 AMQP 通过 Azure 服务总线发送消息。我的问题如下:

有时,消息处理尝试可能会因为我们无法控制的事情而失败。例如,对某些 API 的调用可能会因为服务关闭而失败。那时我们解锁消息,以便稍后或由另一个实例拾取它。经过一定数量的重试后(当交付计数达到某个最大值时),它只会在 DLQ 中结束。

我想要实现的是,在每次交付尝试之间会有越来越多的暂停,因此 X 次重试不会只是快速连续发生,直到达到最大值。通过这种方式,如果只是等待某些服务再次可用,我可以给导致失败的任何时间恢复。如果这不起作用,则消息无论如何都可以转到 DLQ。

天蓝色服务总线中是否有一些设置可以实现这一点,还是我必须将它编程到我自己的应用程序中?

0 投票
0 回答
193 浏览

node.js - 我们如何在 node-rhea 中使用单个容器监听多个队列?

我有一个 node.js 应用程序,我在其中使用实现 AMQP 0.9.1的amqplib

我有多个接收数据的队列。

这些队列中的每一个都有一个处理函数,用于处理其各自队列中的消息。

我正在使用具有全新消息处理概念的rhea库升级到 AMQP 1.0。我正在尝试使用每个队列的不同处理程序来处理每条消息,但我无法弄清楚如何在连接/会话中收听特定队列。

我只能像这里所做的那样在容器级别上收听:https ://github.com/amqp/rhea/blob/HEAD/examples/simple_recv.js

我需要在每个队列上打开一个新容器吗?这是唯一的方法吗?

提前致谢。

0 投票
1 回答
555 浏览

node.js - nodejs rhea npm for amqp couldn't create subscription queue on address in activemq artemis

I have an address "pubsub.foo" already configured as multicast in broker.xml.

As per the Artemis documentation:

When clients connect to an address with the multicast element, a subscription queue for the client will be automatically created for the client.

I am creating a simple utility using rhea AMQP Node.js npm to publish messages to the address.

I enabled debug log and while running the client code I see the message like this.

I also tried different variations of the topic, for example, client1.pubsub.foo, pubsub.foo::client1 however no luck from the client code. Please share your thoughts. I am new to ActiveMQ Artemis.

enter image description here

0 投票
1 回答
400 浏览

node.js - NodeJS rhea AMQP 客户端的故障转移

我的机器上运行了多个 ActiveMQ 实例。它们被配置为共享文件系统主从。如果一个 ActiveMQ 服务器宕机,那么应该自动恢复另一个。这按预期工作。

ActiveMQ第一个实例的相关配置:

ActiveMQ第二个实例的相关配置:

我在 NodeJS 中使用 AMQP 连接,通过以下方式使用rhea :

现在,如果 5672 关闭,我的 ActiveMQ 作为主从运行,我希望客户端自动连接到 5673 并继续工作。该检查应连续进行。

这就是它在 Spring-boot 中的实现方式

0 投票
1 回答
26 浏览

javascript - 如何确保 AMQP 消息不会丢失,以防 rhea 订阅者出错?

因此,我在 JS 中使用 rhea 设计了一个基本的发布者-订阅者模型,该模型接受一个 API 请求以将数据保存在 DB 中,然后将其发布到队列中。

从那里订阅者(下面添加的代码)将其拾取并尝试将其保存在数据库中。现在我的问题是这个数据库实例在开发期间经历了很多变化,并且在插入操作期间可能会导致错误。

所以现在当订阅者尝试推送到这个数据库并导致错误时,数据会因为出队而丢失。我是 JS 的新手,所以有没有办法确保消息不会出队,除非我们确定它被正确保存而不必再次发布错误?

我的订阅者的代码:

0 投票
1 回答
101 浏览

node.js - 如何使用 ActiveMQ 仪表板查看消费者的优先级?

从 ActiveMQ 文档(https://activemq.apache.org/consumer-priority)来看,似乎代理确实支持消费者优先级。我正在尝试使用 nodejs 和 rhea 库来使用此功能。问题是,没有明确的文档如何做到这一点——我什至找不到一个地方来查看我是否设法改变了消费者的优先级。仪表板 (localhost:8161) 确实显示了消息的优先级,但如果消费者..

0 投票
1 回答
164 浏览

node.js - 延迟 ActiveMQ AMQP 1.0 的消息传递,_without_ JMS

为了澄清标题:我使用的是 ActiveMQ 5.15.15(不是 Artemis 引擎),并且我使用的是没有官方 JMS 库的 AMQP 1.0。更具体地说,我使用的是 AmazonMQ 版本,它将很快升级到 5.16.2。如果需要,我可以强制升级。

我正在使用一个AMQP 1.0 兼容库(rhea),到目前为止它为我们提供了很好的服务,但我没有找到任何关于如何让 ActiveMQ 的重新交付插件与我的库一起使用的文档。库维护者也不知道它是如何通过 ActiveMQ 公开的。

尽管尝试添加各种标头、传递注释、消息注释或应用程序属性,但我无法让重新传递插件工作。我schedulerSupport="true"的代理元素中有服务器配置。

这些是我尝试过的键,值是数字。例如,30000在允许消费者/订阅者看到队列中的消息之前的 30 秒。我在各种文档中看到了它们,尝试它们并没有什么坏处。

  • AMQ_SCHEDULED_DELAY
  • x-opt-delivery-delay
  • _AMQ_SCHED_DELIVERY

我还release收到了来自客户端的消息,这意味着它无法传递(还将一个值传递给代理失败并增加尝试传递计数)。虽然交付尝试的数量增加了,但延迟和指数退避似乎并没有在经纪人层面发挥作用。

我看到 STOMP 协议在发布时允许使用标头,这允许更清楚地设置选项。但是,除非这样做有意义,否则我不想切换所有内容。

我还看到了另一种通过 REST API 将延迟消息作为主题发送的功能,但我不确定这是否旨在成为生产用例。

所以现在,我要么在看:

  1. 将消息在内存中保留一段时间,并在延迟后尝试重新发布或释放它
  2. 调查STOMP,看看redelivery插件是否适用于它

但我希望有人知道在哪里实现这一点。

我的 redeliveryPolicy 是基本的:

更新

我正在使用 auth 插件,并且有一个条目似乎是用于内置进程的。我认为这来自示例/默认配置。快速搜索似乎没有很多关于此的文档。我可以尝试向其他用户开放访问权限,但在当前设置下,每次更新/重新启动最多可能需要 15 分钟。

评论澄清

  • 我的主要目标是延迟重新传递,因此消费者不会看到被放回队列 n 秒内的失败消息。
  • 我从没有特殊的标题/属性/注释+重新交付插件开始,这也不起作用。