我正在寻找一种方法来连接消息处理管道并在消费者处理完一些消息后做一些工作。我的意图是打开一个新会话并开始一个事务(可以在 IoC 容器中完成),然后再处理和处理它们。
在 NServiceBus 中,我会使用 IMessageModule 接口来挂钩。有类似的东西吗?实际上,处理处理程序也可以这样做,但是由于我使用 StructureMap 作为 ObjectBuilder,Release 方法什么也不做。
我正在寻找一种方法来连接消息处理管道并在消费者处理完一些消息后做一些工作。我的意图是打开一个新会话并开始一个事务(可以在 IoC 容器中完成),然后再处理和处理它们。
在 NServiceBus 中,我会使用 IMessageModule 接口来挂钩。有类似的东西吗?实际上,处理处理程序也可以这样做,但是由于我使用 StructureMap 作为 ObjectBuilder,Release 方法什么也不做。
您可以注册一个拦截器,以便在每条消息被使用之前和之后调用。举个例子:
LocalBus = ServiceBusConfigurator.New(x =>
{
x.ReceiveFrom("loopback://localhost/mt_client");
x.BeforeConsumingMessage(() => { _before.Set(); });
x.AfterConsumingMessage(() => { _after.Set(); });
});
查看 MassTransit.Tests 项目中的 MessageInterceptor_Specs.cs 文件以进行工作单元测试。
我经历了同样的挑战,这就是我如何去做的。我们有一个 IUnitOfWork 和一个 ITransaction:使用 Commit() 和 Rollback() 我们为 TransactionalOperation 添加了一个类:ITransaction 来为非数据库的东西添加事务行为支持。IUnitOfWork.Commit() 遍历可能已添加到其中的 TransactionalOperations 列表。
现在将总线连接到我们的系统:添加了一个 IBus 来包装外部总线 实现了一个 MassTransitBusGateway:IBus 然后将总线连接到工作单元:实现了 UnitOfWorkBus:IBus(装饰器) - 这个装饰器对 Publish() 进行任何调用知道工作单元并将其添加为 TransactionalOperation 以便其执行延迟到 UnitOfWork.Commit()
通过这种方式,我们将具体总线抽象化,从而避免在多个项目上添加 MassTransit 依赖项(只有客户真正需要它)并在其操作中添加事务行为。