2

我正在使用 Rebus,我想在“避免多次处理事件”段落中介绍CQRS Journey中描述的内容,但我无法弄清楚。

我将 Rebus 配置为使用 SQL Server for Transport和 MongoDB for SubscriptionsSagasRouting配置为基于Type,并将所有命令处理程序的类型映射到Transport中配置的队列。

 var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            .Logging(l => l.Trace())
            .Transport(t =>
            {
                t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName");
            })
            .Routing(r => r.TypeBased()
                            .MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName")
                            .MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName")
                             )
            .Sagas(s => s.StoreInMongoDb(db, (sagaType) =>
            {
                return sagaType.Name;
            }))
            .Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions"))
            .Options(o =>
            {
                o.SetNumberOfWorkers(1);
                o.SetMaxParallelism(1);
                o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots");
            })
            .Start();

现在我应该配置 Rebus,当命令发布事件时,它会复制到与现有订阅者类型一样多的单独主题(虚拟或物理队列)中。

就像是:

bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait();
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait();
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait();

感谢帮助。

4

1 回答 1

2

您的问题似乎有些令人困惑。

但我想你的基本问题是如何确保每条消息只由每个订阅者处理一次。

答案很简单:每个订阅者都有一个单独的端点——这意味着每个订阅者都有自己的输入队列,从中处理消息,以及返回失败的消息。

然后,您可以根据需要在每个订阅者中拥有尽可能多或尽可能少的处理程序。将为每个传入消息执行所有兼容的处理程序。

使用 Rebus,每次调用Configure.With(...).(...).Start()都会给你一个单独的端点 - 所以在你的情况下,我建议你将订阅者端点的创建包装在一个方法中,然后你可以像这样调用它:

var event1Subscriber = CreateSubscriber("subscriber_event1");
event1Subscriber.Subscribe<Event1>().Wait();

var event2Subscriber = CreateSubscriber("subscriber_event2");
event2Subscriber.Subscribe<Event2>().Wait();

var event3Subscriber = CreateSubscriber("subscriber_event3");    
event3Subscriber.Subscribe<Event3>().Wait();

// ...

那么哪里CreateSubscriber会是这样的:

public IBus CreateSubscriber(string queueName)
{
    return Configure.With(GetContainerAdapter())
        .Transport(t => t.UseMsmq(queueName))
        .Start();        
}
于 2016-03-16T06:45:13.860 回答