问题标签 [message-bus]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
259 浏览

ruby-on-rails - Rails 5.04 上的基本 message_bus gem 设置

我正在尝试在全新安装的 Rails 5.04 上设置 MessageBus gem ( https://github.com/SamSaffron/message_bus )

到目前为止我已经

  1. 运行bundle
  2. 添加message-bus.js并包含在application.js
  3. 创建了一个初始化器
  4. 运行了一个rails脚手架生成器
  5. 启动了一个request.js订阅的客户端/TestChannel

当我去测试应用程序时,看起来客户端正在轮询(基于服务器日志),但是,当我尝试在 rails 控制台中发送消息时,客户端没有收到它。然而,根据服务器日志,它似乎试图发送它。

例子:

Rails 控制台:

MessageBus.publish "/TestChannel", "hello"在导轨控制台中

服务器日志:

=> Delivering messages [] to client e4ac8e6cfeb340a8a389468bf0536053 for user (chunked)

浏览器:没有触发。

任何让客户触发的帮助将不胜感激。

0 投票
0 回答
270 浏览

design-patterns - 了解企业集成消息传递模式

我目前正在阅读 Enterprise Integration Patterns: Messaging 这本书。

但是我仍然很难理解模式及其应用程序,尤其是在查看现有实现时,例如 NServiceBus、MassTransit 和另一种方式的 RabbitMQ。

http://www.enterpriseintegrationpatterns.com/patterns/messaging/index.html

例如,在“消息传递渠道”部分中,有PointToPoint, PubSubAND但我并没有真正了解与其他类型渠道相比MessageBus背后的原因。MessageBus

我正在尝试为工作和教育目的进行自定义实现,虽然我不想获得功能齐全的库,但我想适当地使用它。

我找不到任何关于每种模式(或至少是最常见的模式)的简单实现并将它们连接在一起的材料。

例如,Endpoint我理解它的目的是既可以是 a 也可以是ReceiveraSender所以这意味着它可以send(Message)或者receive()但是,与 有什么区别MessageBus

所以这意味着我可以订阅一个Endpointwith subscribe(MessageType, Handler)

我也没有真正看到它如何与队列系统(例如 RabbitMQ 或自定义 SQL 实现)连接。

Endpoint 是否通过队列从队列中入队/出队来交换消息?这会发生在 Channel 中吗?

0 投票
1 回答
508 浏览

redis - Redis messagebus vs. d-bus:底层发生了什么?基本上是一样的吗?

老实说,我很惊讶我以前没有看到过这个讨论。我的印象是 Redis 使用普通的互联网数据包交换到服务器并获取结果,但 d-bus 使用 unix 套接字(管道)并叠加了一个分发系统。我想我通常会问哪个更快,但我真的很想知道当我使用这两个中的任何一个传递消息时到底发生了什么。两者都可能有优势。有人可以阐明一点吗?

0 投票
2 回答
293 浏览

javascript - Postal.js 订阅未收到来自其他反应组件的帖子

我在一个页面上运行了两个解耦组件,并且希望在单击一个到另一个时使用 Postal.js 发送消息。

单击第一个组件时,它会发布一个帖子:

然后在第二个组件中,我在 componentDidMount() 中订阅了该频道:


当我向订阅的组件添加窃听器时,它显示频道已成功订阅,但从未显示来自其他组件的帖子:

当我向发布组件添加窃听时,它显示消息已发布:

我在这里做错了什么?

0 投票
2 回答
938 浏览

integration - 使用 Azure 服务总线在 2 个组织之间进行通信

我们希望解耦 2 个独立组织之间的系统(例如:一个可能是一组内部应用程序,另一个可能是一组 3rd 方应用程序)。虽然我们可以使用基于 REST 的 API 来做到这一点,但我们希望实现时间解耦、可伸缩性、可靠和持久的通信、工作负载解耦(通过扇出)等。正是出于这些原因,我们希望使用消息公共汽车。

现在可以使用 Amazon 的 SNS 和 SQS 作为消息总线基础设施,我们的组织将有一个 SNS 实例,该实例将发布到第 3 方 SQS 实例。同样,对于第 3 方希望发送给我们的消息,他们会发布到他们的 SNS 实例,然后再发布到我们的 SQS 实例。请参阅:与 Amazon SNS 的跨账户集成

我在考虑如何使用 Azure 服务总线 (ASB) 实现这种跨组织集成,因为我们在 Azure 上投入了大量资金。但是,ASB 没有能力从一个实例发布到属于不同组织的另一个实例(或者甚至到同一组织中的另一个实例,至少目前还没有)。鉴于此限制,我们计划为第 3 方供应商提供单独的连接字符串集,允许他们收听和处理我们发布的消息,以及一组单独的连接字符串,允许他们将消息发布到主题然后我们可以订阅和处理。

我的问题是:这是个好主意吗?或者这会被认为是一种反模式吗?我最担心的事实是,虽然使用消息总线的目的是实现解耦,但 ASB 的基础设施部分使我们紧密耦合到我们需要在两个组织之间进行通信的点上,不仅是端点,而是此外,如何设置队列/主题(会话,或无会话,重复检测等)以及消费者与发送者如何发送消息(用作会话 ID、消息 ID 等的内容)紧密耦合。

这是一个有效的担忧吗?你做过吗?我可能会遇到哪些其他问题?

0 投票
1 回答
3531 浏览

microservices - 消息总线 C# 微服务的实现

我们在 .net core 1.0 中创建了一些微服务,我们遵循 CQRS 模式,我们还使用列出所有 api 的 swagger,我们有一个要求是我们需要实现消息总线(尚未决定可能是 AWS),这个消息总线将协调跨多个后端服务的 UI 操作,所以我不知道如何开始,因为我是新手,我需要了解消息总线、队列、将事件发布到消息总线,所以请你帮忙我明白吗?此外,任何指向教程视频等的指针以及解释都会有所帮助。

0 投票
1 回答
77 浏览

java - 消息总线的看门狗

所以我开始在一家半导体公司工作,作为一名在职学生,他们给我的任务是为 tibco 消息总线编写一个看门狗。它应该显示消息传递总线的工作量,如果它现在很忙。我还没有弄清楚一切,但据我了解架构,现在有像 EQC 和 EQS 这样的 Java 进程来处理消息传递并与消息传递总线交互。我们还有一些与数据库连接的服务。我们有一个包含非常大数据的大型数据库。我们通过 pl/sql 访问这些数据。所以我不知道如何开始这样的事情。你们知道如何开始/做到这一点吗?

0 投票
3 回答
1514 浏览

rabbitmq - RabbitMQ + MassTransit:如何从处理中取消排队的消息?

在某些特殊情况下,我需要以某种方式在接收点告诉消费者某些消息不应该被处理。否则两个系统将变得不同步(我们处理一些过时的外部系统,例如,如果连接断开,我们必须丢弃该连接范围内的所有排队操作)。

冒险并手动解决问题消息?补偿行动(在我的情况下可能很难支持)?还要别的吗?

0 投票
2 回答
7159 浏览

generics - 如果我有消息类型列表,如何在 MassTransit 中注册通用消费者适配器

我成功地将 MassTransit 用于一个愚蠢的示例应用程序,在该示例应用程序中,我从 Publisher 控制台应用程序发布消息(事件),并在两个不同的消费者处接收它,这两个消费者也是使用 RabbitMq 的控制台应用程序。

这是整个示例项目 git repo: https ://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus

我想要一个包含 MassTransit 功能的项目,以便我的 Publisher 和 Consumers 项目对 MassTransit 一无所知。依赖项应该朝这个方向发展:

  • DiDrDe.MessageBus ==> 大众运输
  • DiDrDe.MessageBus ==> DiDrDe.Contracts
  • DiDrDe.Model ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.Model
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrDe.Model
  • DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

请注意 DiDrDe.MessageBus 对 DiDrDe.Model 一无所知,因为它是一个对任何消息类型都应该有效的通用项目。

为了实现这一点,我正在实现适配器模式,以便我的自定义接口IEventDtoBus(发布事件)和IEventDtoHandler<TEventDto>(使用事件)都是我的发布者和消费者知道的。MassTransit 包装器项目(称为 DiDrDe.MessageBus)使用EventDtoBusAdapter由 anIEventDtoBus和 a组成的适配器实现适配器,这是我唯一由 an 组成的EventDtoHandlerAdapter<TEventDto>泛型IConsumer<TEventDto>IEventDtoHandler<TEventDto>

我遇到的问题是 MassTransit 要求注册消费者的方式,因为我的消费者是通用消费者,并且 MassTransit 包装器在编译时不应该知道它的类型。

我需要找到一种方法来EventDtoHandlerAdapter<TEventDto>为我在运行时传递的每个 TEventDto 类型注册为消费者(例如,作为类型的集合)。请参阅我的存储库以获取所有详细信息。

MassTransit 支持接受类型的重载方法(好!正是我想要的),但它还需要第二个参数Func<type, object> consumerFactory,我不知道如何实现它。

更新 1问题是我无法注册这个通用消费者,例如:

因为我得到一个编译错误

严重性代码描述项目文件行抑制状态错误 CS0310“EventDtoHandlerAdapter”必须是具有公共无参数构造函数的非抽象类型,才能将其用作泛型类型或方法“ConsumerExtensions.Consumer”中的参数“TConsumer”。Consumer(IReceiveEndpointConfigurator,Action> )' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

更新 2:我尝试了几件事,并在我的仓库中更新了项目。这些是我在 MassTransit 包装器项目中的尝试。请注意,如果我向要处理的每条消息(事件)添加依赖项,我是如何让一切正常工作的。但我不希望那样......我不希望这个项目知道任何关于它可以处理的消息的信息。如果我能注册只知道消息类型的消费者就好了..

更新 3:按照 Igor 的建议,我尝试执行以下操作:

但我得到一个运行时错误说

请求的服务 'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]' 尚未注册。要避免此异常,请注册一个组件以提供服务,使用 IsRegistered() 检查服务注册,或使用 ResolveOptional() 方法解决可选依赖项。

我什至尝试EventDtoHandlerAdapter<>单独注册为 IConsumer,以防万一这是问题,但没有运气。

还有:

它告诉我

System.ObjectDisposedException: '此解析操作已经结束。使用 lambda 注册组件时,无法存储 lambda 的 IComponentContext 'c' 参数。相反,要么从 'c' 再次解析 IComponentContext,要么解析基于 Func<> 的工厂以从

澄清一下,我唯一需要注册的消费者是我的EventDtoHandlerAdapter<TEventDto>. 它是通用的,所以本质上它会存在我支持的每个 TEventDto 的注册。问题是我不需要提前使用类型,所以我需要使用类型进行操作。

更新 4: Igor 建议的新尝试。这次是“代理”。我已经用最后一次尝试更新了我的回购所有细节。我有我的消费者和标记界面:

我有自己的消费者实现,对大众运输一无所知:

现在我的“包装消费者”就是我所说的适配器,因为它知道 MassTransit(它实现了 MassTransit IConsumer)。

现在最后一步是向 MassTransit 注册我的“包装消费者”。但由于它是通用的,我不知道该怎么做。这就是问题所在。

我可以按照以下建议在 Autofac 中扫描和注册我的所有消费者类型:

所以现在我有了所有的消费者类型(所有的实现IEventDtoHandler,包括我的ThingHappenedHandler)。现在怎么办?如何注册?

像下面这样的东西不起作用

但是我想它不起作用是正常的,因为我要注册的是我的EventDtoHandlerAdapter,这是真实的IConsumer

所以,我想我没有理解你的建议。对不起!

我需要的是这样的:

但是没有使用 ThingHappened 模型,因为该模型不应该是已知的。这是我仍然卡住的地方

更新 5: Chris Patterson 建议的新尝试(他的解决方案已合并到我的仓库中的 master 中),但问题仍然存在。

澄清一下,DiDrDe.MessageBus必须与任何发布者、消费者和模型无关。它应该只依赖于 MassTransit 和DiDrDe.Contracts,而 Chris 的解决方案有如下一行:

这直接依赖于ThingHappened模型。这是不允许的,它实际上与我已经拥有的解决方案没有太大不同:

抱歉,如果这还不够清楚,但 DiDrDe.MessageBus 最终将成为许多不同的消费者和发布者项目之间共享的 nuGet 包,它不应该对任何特定的消息/模型有任何依赖关系。

更新6: 问题已解决。非常感谢 Igor 和 Chris 的时间和帮助。我已将解决方案推送到我的仓库中。

PS:不幸的是,当我在同一个消费者中有两个处理程序处理同一个事件时,此解决方案有其局限性,因为似乎只有一个处理程序正在执行(两次)。我希望两个处理程序都被执行或只执行一个,但只执行一次(不是两次)。但这已经是另一个主题了:)

0 投票
1 回答
1950 浏览

c++ - AMQP-CPP - 基于事件的方法

我一直在使用 AMQP-CPP 进行 RabbitMQ 服务器通信。我使用的示例客户端基于基于轮询的方法。任何人都可以使用基于事件的消息通信(C++)吗?

如果是,我会恳请您分享样品,

提前致谢。