问题标签 [message-bus]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1258 浏览

message-queue - 具有广播和路由功能的轻量级消息总线?

我正在尝试找到可以处理以下内容的最轻量级的消息总线(队列?):

  1. 生产者 A 订阅总线。总线是通过众所周知的标识形式(如名称、套接字或其他东西)指定的。
  2. 消费者 B 订阅相同的总线并仅注册某种类型的消息。
  3. 消费者 C 订阅了同一条总线,并注册了另一种与 B 重叠的消息。
  4. 生产者 A 将一条消息放入总线,使 B 和 C 都感兴趣。B 和 C 都接收到消息(不仅仅是其中一个,而是两个)。

A、B、C 和总线位于不同的机器中。

0 投票
0 回答
128 浏览

c# - WCF - 如何在负载均衡器后面自动订阅端点地址?

我创建了一个迷你消息总线,它在其客户端上运行操作(例如清除它们的缓存)。我正在尝试使其动态化,以便添加的每个新应用程序都会自动将其端点订阅到总线,以便总线稍后可以与它通信。应用程序位于负载均衡器后面,因此我尝试使用每个新应用程序的显式 IP 进行订阅。我收到“无法为具有权限的 SSL/TLS 安全通道建立信任关系”的错误,因为证书是颁发给域名而不是 IP 并且我处于 TransportWithMessageCredential 安全模式(我不喜欢使用ServicePointManager.ServerCertificateValidationCallback 技巧,因为它适用于整个应用程序)。

知道如何实现这种动态行为,以便添加的每个新服务器都可以订阅消息总线,并且消息总线可以专门在该服务器节点上运行操作吗?

0 投票
1 回答
5162 浏览

c# - 没有依赖注入的命令总线/调度程序和处理程序注册

我正在尝试实现一个可消耗的库,该库在 CQRS 的上下文中为每个域提供读/写应用程序服务。命令总线(或调度程序,或在这种情况下可以调用的任何东西)接口可能会或可能不会暴露,但应该从消费者那里抽象出实现,以鼓励对接口定义的合约进行编程。我不想要求图书馆的消费者必须在他们的 DI 框架中设置图书馆,而不是使用标准约定,所以使用的 DI 框架应该无关紧要(要求基于约定的 DI 超出了这个问题的范围) .

主要问题在于我希望 ICommandHandler 的内部实现以某种方式自动注册到命令总线,但构造函数不采用泛型,如下面的实现:

使用 DI 容器(在本例中为 Ninject),如果 ICommandBus 稍作更改,则不允许注册的实现可能如下所示:

我还在Mark Seeman 的博客上阅读过类似这样的文章,这些文章描述了在没有服务位置或依赖 DI 容器(通过“穷人的 DI”)的情况下执行此操作的方法,但是我无法通过 Ninject 获得这种解决方案。全部(更不用说以某种方式使用约定或不依赖 DI 为我完成工作),并且似乎需要更多的“锅炉”代码才能将事情连接起来。

关于如何在不必在某处显式注册处理程序的情况下进行此操作的任何建议?我关于通过注册命令处理程序来允许可扩展性的想法是否有效?

0 投票
1 回答
252 浏览

scala - 使用消息总线代替参与者之间的常规消息传递(例如,在 scala 中)

我有一个 Java Web 服务,我将在 Scala 中从头开始重新实现它。我有一个基于演员的新代码设计,大约有 10-20 个演员。其中一个用例的流程如下:

ActorA获取一条消息a,创建数十条b消息由 Actor 处理B(可能是多个实例,用于负载平衡),c为 Actor 生成多条消息C,等等。

在上面的场景中,一条消息a可能会导致来回发送几千条消息,但我预计a一天不会超过几条消息(是的,目前它不是一个繁忙的服务)。

我有以下要求:

  1. 消息不应丢失或重复。我的意思是如果系统在处理 b 消息的过程中重新启动,则应该在重新启动后拾取未处理的消息。另一方面,处理过的消息不应该被再次获取(这些消息最终会开始一些大的计算,重复它们是昂贵的)。
  2. 它应该易于扩展。我的意思是将来,我可能想在系统中添加一些其他组件,这些组件可以读取所有通信(或部分通信),例如记录发生的事情,或者计算处理了多少 b 消息,或者用 b 消息做一些新的事情(在已经发生的事情旁边)等等。请注意,这些“组件”可以是用其他语言编写的独立应用程序。

我是消息总线技术的新手,但从我所读到的内容来看,这些要求在我看来就像“消息总线”提供的东西,如 RabbitMQ、Kafka、Kestrel,但我也看到 akka 也提供了一些持久性方法。我的问题是,考虑到各种可能性,我不知道该使用哪种技术。我读到像 Kafka 这样的东西对我的应用程序来说可能是一种过度杀伤力。但我也不确定 akka 持久性是否能满足我的两个要求(尤其是可扩展性)。

我的问题是:我应该选择企业消息总线吗?卡夫卡之类的?或者像 akka 持久性这样的东西会做吗?或者,如果我自己实现某些东西(例如,支持 AMQP 以允许可扩展性),它会更快更合适吗?

当然,如果您知道适合此目的的东西,也欢迎具体的技术建议。

0 投票
3 回答
6235 浏览

python - 发布订阅和消息总线 Python

我正在尝试为我的一些 python 模块创建一个中央日志记录系统。我希望能够从多个带有日志的模块发送消息,然后中央记录器将它们接收并进行处理。

在此处输入图像描述

为简单起见,我希望我的模块 A 看起来像这样:

和记录器(唯一的订阅者)

现在 Logger 模块就像一个库,所以它不是持续运行的,所以也许我可以在 Logger 和 Message Bus 之间引入一些东西,它会不断地观察新消息。

我看过PyPubSub但它似乎没有在文档中介绍不同正在运行的 python 模块之间的持久通信。如果有人尝试过这个,如果我可以在不同的模块之间使用它,它对我有用。

另一个问题是我最终可能会得到不是用 python 编写的模块,所以我真的不想要模块 A、B 和 Logger 之间的直接通信。最后我的架构可能是这样的: 在此处输入图像描述

我希望上面的信息不会令人困惑。

tl;dr:在 python 中使用持久消息总线和不断等待新消息的订阅者发布-订阅。有现成的解决方案吗?

编辑:我正在考虑运行一个知道 Logger 模块的 Web 套接字服务器,而其他模块 A、B 知道 websocket 的地址。这种设计有什么缺点吗?

0 投票
1 回答
1153 浏览

java - 用于内部微服务调用的消息总线与 Quasar/HTTP

我正在寻找优化当前使用 HTTP/REST 进行内部节点到节点通信的微服务架构。

一种选择是在服务中实现背压功能,(例如)通过将类似 Quasar 的东西集成到堆栈中。这无疑会改善情况。但我看到了一些挑战。一种是,异步客户端线程是瞬态的(在内存中),并且在客户端失败(崩溃)时,这些重试线程将丢失。第二,理论上,如果目标服务器宕机一段时间,客户端最终可能会到达 OOM 尝试重试,因为线程最终是有限的,甚至是 Quasar Fibers。

我知道这有点偏执,但我想知道基于队列的替代方案在非常大的规模上是否更有利。

它仍然可以像 Quasar/fibers 一样异步工作,除了 a) 队列是集中管理的,并且脱离客户端 JVM,b) 队列可以是持久的,因此在客户端和/或目标服务器出现故障时,没有飞行中的消息丢失了。

当然,排队的缺点是有更多的跃点,它会减慢系统的速度。但我认为可能存在一个甜蜜点,即 Quasar ROI 达到峰值,集中且持久的队列对于扩展和 HA 变得更加重要。

我的问题是:

是否讨论过这种权衡?是否有关于使用集中式外部队列/路由器方法进行服务内通信的论文。

TL;博士; 我刚刚意识到我可以将这个问题表述为:

“什么时候适合在微服务架构中使用基于消息总线的服务内通信而不是直接 HTTP。”

0 投票
1 回答
460 浏览

c# - ObserveOnDispatcher 不会在 UI 线程中调用处理程序

我有一个消息总线类,它使用 Rx 在 WPF 应用程序中推送多个线程事件。我的问题 ObserveOnDispatcher 没有在 UI 线程中调用事件处理程序。

代码:


0 投票
1 回答
1966 浏览

c# - 如何停止 MassTransit 为错误消息创建交换绑定

我正在尝试收听错误队列以处理失败的消息,但我似乎无法让 MassTransit 不对我希望它在配置中收听的消息设置绑定。配置如下,使用 MassTransit v3:

在上面的示例中,它将为发布的任何内容设置绑定,SomeMessage并将它们定向到myqueue_error我只希望消息进入该队列,该队列已从失败的服务转发。有没有办法从队列中消费消息,但告诉 MassTransit 不要为它们绑定?

更新 - 潜在的解决方案

似乎我不需要设置 ReceiveEndpoint 但我可以重命名控制总线以接受我关心的消息,然后这将能够处理这些消息而无需为消息创建交换绑定。

下面是修改后的代码,不确定这是否是一种理想的方式,但它可以工作

0 投票
1 回答
1399 浏览

python - 使用 pykafka 在主题的特定分区上发布

如何在pykafka主题的特定分区上发布消息。在下面的一段代码中,测试主题有四个分区,我打算在其中一个分区中写入每条消息,但显然它不是那样工作的。

0 投票
1 回答
2340 浏览

ruby-on-rails - NotImplementedError(仅支持部分劫持。)使用 message_bus

我只是将 gem 添加到 gemfile 并写入 js 控制台:

它抛出:

为此,我基于大师: