我正在使用最新版本的 Rebus (0.99.35) 和 SimpleInjector (3.1.2)。在我的第一个示例项目中,我使用 SQL Server 进行传输和 Sagas。
问题是 Saga 方法Handle(StartTheSagaMessage message)
,实现IAmInitiatedBy<StartTheSagaMessage>
,被调用了 5 次,我不明白为什么。此外,此方法会向自身发布一条总线从未接收到的消息。
以下是配置代码:
var container = new Container();
var assemblies = AppDomain.CurrentDomain.GetAssemblies()
.Where(i => i.FullName.StartsWith("Messages"));
container.RegisterCollection(typeof(IHandleMessages<>), assemblies);
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.Logging(l => l.Trace())
.Transport(t => t.UseSqlServer(connectionstring, "Messages", "consumer"))
.Routing(r => r.TypeBased().MapAssemblyOf<Job>("consumer"))
.Sagas(s => s.StoreInSqlServer(connectionstring, "Sagas", "SagaIndexTable"))
.Options(o =>
{
o.SetNumberOfWorkers(1);
o.SetMaxParallelism(1);
})
.Start();
container.Verify();
bus.Subscribe<Step1FinishedMessage>().Wait();
bus.Subscribe<Step2FinishedMessage>().Wait();
var procId = Guid.NewGuid();
bus.Send(new StartTheSagaMessage() { ProcessId = procId });
和传奇代码:
public class MySaga : Saga<MySagaData>,
IAmInitiatedBy<StartTheSagaMessage>,
IHandleMessages<Step1FinishedMessage>,
IHandleMessages<Step2FinishedMessage>
{
public IBus Bus { get; set; }
protected override void CorrelateMessages(ICorrelationConfig<MySagaData> config)
{
config.Correlate<StartTheSagaMessage>(m => m.ProcessId, s => s.SagaProcessId);
config.Correlate<Step1FinishedMessage>(m => m.ProcessId, s => s.SagaProcessId);
config.Correlate<Step2FinishedMessage>(m => m.ProcessId, s => s.SagaProcessId);
}
public async Task Handle(StartTheSagaMessage message)
{
if (IsNew == false)
return;
Trace.TraceInformation("Mysaga - got StartTheSagaMessage: {0}", message.ProcessId);
//The saga is started - Do some stuff - call webservices (in external handler)
//When this step is finished the external process replies with a "step1FinishedMessage"
this.Data.SagaProcessId = message.ProcessId;
//Fake Step1FinishMessage (should be replied from external handler)
await Bus.Send(new Step1FinishedMessage() { ProcessId = this.Data.SagaProcessId });
}
public async Task Handle(Step1FinishedMessage message)
{
Trace.TraceInformation("Mysaga - got Step1FinishedMessage: {0}", message.ProcessId);
//Sagabehaviour when the Step1 is finished by the external handler
this.Data.Step1Finished = true;
//After dalying 10 seconds - Send a step2finishedmessage
await Bus.Defer(TimeSpan.FromSeconds(10), new Step2FinishedMessage() { ProcessId = this.Data.SagaProcessId });
}
public async Task Handle(Step2FinishedMessage message)
{
await Task.Run(() =>
//return Task.FromResult<void>(() =>
{
Trace.TraceInformation("Mysaga - got Step2FinishedMessage: {0}", message.ProcessId);
//Step2 is handled - finished the saga
this.Data.Step2Finished = true;
this.MarkAsComplete();
});
}
}
完整示例基于此处提供的解决方案。
我做错了什么?
感谢帮助。