问题标签 [rhea]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
openwhisk - 来自 Google Flatbuffers 的 OpenWhisk 和二进制数据
我们有一个模拟设备创建的数据,该设备使用 NanoMSG 与 Google FlatBuffers(二进制)的有效负载放在网络上。
我们希望使用 OpenWhisk 触发此数据的模式,并使用 Flatbuffer 编码的响应进行响应。
假设延迟和吞吐量在这里不是一个大问题。
我们可以采取哪种方法:
编写一个转发器,将 Flatbuffer 转换为 JSON(FB 有一个实用程序可以做到这一点),然后将数据放到 OpenWhisk 监听的 AMQP 总线上?(我们有熟悉 AMQP 的人,但不熟悉 Kafka
尝试用 Kafka 做一些事情,这似乎(可能只是 IBM 版本)可以直接处理二进制 Flabuffers(可能仍然需要从 NanoMSG 到 Kafka 的 shim。例如
如何在 Bluemix 中从 IoT 平台调用 OpenWhisk 操作
不确定我们是否仍然不需要 Flatbuffers JavaScript 反序列化器和序列化器来将 JavaScript 中的二进制 based64 数据转换为 JSON
学习 Kakfa,然后将 NanoMsg 负载(Flatbuffers 转换为 JSON)。
还有什么?
有人有这方面的直接经验吗?
更新
谢谢詹姆斯,这些都是现场链接。但它确实提出了一些次要问题:
- 如果数据在 Google FlatBuffers 模式中,使用 Kafka 二进制转换似乎没有任何优势,因为来自 base64 的 mux/demux 仍然需要在 javascript 层中完成。
- Kafka(以低延迟着称)正在对事件进行批处理,这有点令人不安。当一个人有物联网(传感器数据)需要在执行器(传感器->控制->执行器)的闭环中响应时,这确实会影响延迟,这是一种常见的机器人模型,这与我们的模型非常接近正在做。目前我们并没有推动延迟问题,但我可以看到我们需要低延迟的新兴案例。Kafka Whisk 提供者社区对此有何看法?
- 我一定遗漏了一些东西,但 AMQP 提供商说它正在使用 RHEA https://github.com/amqp/rhea#receiver。在编写用于处理传感器流数据的简单触发器/规则方面,这似乎满足了所有需求。为什么要使用 OpenWhisk?
amqp - 无法打开绑定到应用程序标头 x-match 表达式的接收器
我正在使用 rhea ( https://github.com/amqp/rhea ),一个 node.js 库来开发 AMQP 1.0 客户端。
我正在尝试使用x-match表达式而不是 JMS 表达式来调整https://github.com/amqp/rhea/tree/master/examples/selector示例。
目的是实现基于符合 AMQP 1.0 的代理(ActiveMQ、Qpid、...)的标头路由机制。
我在 recv.js 的相应部分尝试了这段代码:
从 Qpid Java 代理(版本 7.1.0)收到连接错误“预期值类型为 'Filter' 但得到 'String' amqp:decode-error”。
amqp - 如何使用 rhea 使用 amqp 在 azure 服务总线中实现重新交付延迟
我在 nodejs 应用程序中使用 rhea 使用 AMQP 通过 Azure 服务总线发送消息。我的问题如下:
有时,消息处理尝试可能会因为我们无法控制的事情而失败。例如,对某些 API 的调用可能会因为服务关闭而失败。那时我们解锁消息,以便稍后或由另一个实例拾取它。经过一定数量的重试后(当交付计数达到某个最大值时),它只会在 DLQ 中结束。
我想要实现的是,在每次交付尝试之间会有越来越多的暂停,因此 X 次重试不会只是快速连续发生,直到达到最大值。通过这种方式,如果只是等待某些服务再次可用,我可以给导致失败的任何时间恢复。如果这不起作用,则消息无论如何都可以转到 DLQ。
天蓝色服务总线中是否有一些设置可以实现这一点,还是我必须将它编程到我自己的应用程序中?
node.js - 我们如何在 node-rhea 中使用单个容器监听多个队列?
我有一个 node.js 应用程序,我在其中使用实现 AMQP 0.9.1的amqplib
我有多个接收数据的队列。
这些队列中的每一个都有一个处理函数,用于处理其各自队列中的消息。
我正在使用具有全新消息处理概念的rhea库升级到 AMQP 1.0。我正在尝试使用每个队列的不同处理程序来处理每条消息,但我无法弄清楚如何在连接/会话中收听特定队列。
我只能像这里所做的那样在容器级别上收听:https ://github.com/amqp/rhea/blob/HEAD/examples/simple_recv.js
我需要在每个队列上打开一个新容器吗?这是唯一的方法吗?
提前致谢。
node.js - nodejs rhea npm for amqp couldn't create subscription queue on address in activemq artemis
I have an address "pubsub.foo" already configured as multicast in broker.xml
.
As per the Artemis documentation:
When clients connect to an address with the multicast element, a subscription queue for the client will be automatically created for the client.
I am creating a simple utility using rhea AMQP Node.js npm to publish messages to the address.
I enabled debug log and while running the client code I see the message like this.
I also tried different variations of the topic, for example, client1.pubsub.foo
, pubsub.foo::client1
however no luck from the client code. Please share your thoughts. I am new to ActiveMQ Artemis.
javascript - 如何确保 AMQP 消息不会丢失,以防 rhea 订阅者出错?
因此,我在 JS 中使用 rhea 设计了一个基本的发布者-订阅者模型,该模型接受一个 API 请求以将数据保存在 DB 中,然后将其发布到队列中。
从那里订阅者(下面添加的代码)将其拾取并尝试将其保存在数据库中。现在我的问题是这个数据库实例在开发期间经历了很多变化,并且在插入操作期间可能会导致错误。
所以现在当订阅者尝试推送到这个数据库并导致错误时,数据会因为出队而丢失。我是 JS 的新手,所以有没有办法确保消息不会出队,除非我们确定它被正确保存而不必再次发布错误?
我的订阅者的代码:
node.js - 如何使用 ActiveMQ 仪表板查看消费者的优先级?
从 ActiveMQ 文档(https://activemq.apache.org/consumer-priority)来看,似乎代理确实支持消费者优先级。我正在尝试使用 nodejs 和 rhea 库来使用此功能。问题是,没有明确的文档如何做到这一点——我什至找不到一个地方来查看我是否设法改变了消费者的优先级。仪表板 (localhost:8161) 确实显示了消息的优先级,但如果消费者..
node.js - 延迟 ActiveMQ AMQP 1.0 的消息传递,_without_ JMS
为了澄清标题:我使用的是 ActiveMQ 5.15.15(不是 Artemis 引擎),并且我使用的是没有官方 JMS 库的 AMQP 1.0。更具体地说,我使用的是 AmazonMQ 版本,它将很快升级到 5.16.2。如果需要,我可以强制升级。
我正在使用一个AMQP 1.0 兼容库(rhea),到目前为止它为我们提供了很好的服务,但我没有找到任何关于如何让 ActiveMQ 的重新交付插件与我的库一起使用的文档。库维护者也不知道它是如何通过 ActiveMQ 公开的。
尽管尝试添加各种标头、传递注释、消息注释或应用程序属性,但我无法让重新传递插件工作。我schedulerSupport="true"
的代理元素中有服务器配置。
这些是我尝试过的键,值是数字。例如,30000
在允许消费者/订阅者看到队列中的消息之前的 30 秒。我在各种文档中看到了它们,尝试它们并没有什么坏处。
AMQ_SCHEDULED_DELAY
x-opt-delivery-delay
_AMQ_SCHED_DELIVERY
我还release
收到了来自客户端的消息,这意味着它无法传递(还将一个值传递给代理失败并增加尝试传递计数)。虽然交付尝试的数量增加了,但延迟和指数退避似乎并没有在经纪人层面发挥作用。
我看到 STOMP 协议在发布时允许使用标头,这允许更清楚地设置选项。但是,除非这样做有意义,否则我不想切换所有内容。
我还看到了另一种通过 REST API 将延迟消息作为主题发送的功能,但我不确定这是否旨在成为生产用例。
所以现在,我要么在看:
- 将消息在内存中保留一段时间,并在延迟后尝试重新发布或释放它
- 调查STOMP,看看redelivery插件是否适用于它
但我希望有人知道在哪里实现这一点。
我的 redeliveryPolicy 是基本的:
更新
我正在使用 auth 插件,并且有一个条目似乎是用于内置进程的。我认为这来自示例/默认配置。快速搜索似乎没有很多关于此的文档。我可以尝试向其他用户开放访问权限,但在当前设置下,每次更新/重新启动最多可能需要 15 分钟。
评论澄清
- 我的主要目标是延迟重新传递,因此消费者不会看到被放回队列 n 秒内的失败消息。
- 我从没有特殊的标题/属性/注释+重新交付插件开始,这也不起作用。