1

我正在使用最新版本的 Rebus (0.99.35) 和 SimpleInjector (3.1.2)。在我的第一个示例项目中,我使用 SQL Server 进行传输和 Sagas。

问题是 Saga 方法Handle(StartTheSagaMessage message),实现IAmInitiatedBy<StartTheSagaMessage>,被调用了 5 次,我不明白为什么。此外,此方法会向自身发布一条总线从未接收到的消息。

以下是配置代码:

var container = new Container();

var assemblies = AppDomain.CurrentDomain.GetAssemblies()
    .Where(i => i.FullName.StartsWith("Messages"));

container.RegisterCollection(typeof(IHandleMessages<>), assemblies);

var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
    .Logging(l => l.Trace())
    .Transport(t => t.UseSqlServer(connectionstring, "Messages", "consumer"))
    .Routing(r => r.TypeBased().MapAssemblyOf<Job>("consumer"))
    .Sagas(s => s.StoreInSqlServer(connectionstring, "Sagas", "SagaIndexTable"))
    .Options(o =>
    {
        o.SetNumberOfWorkers(1);
        o.SetMaxParallelism(1);
    })
    .Start();

container.Verify();

bus.Subscribe<Step1FinishedMessage>().Wait();
bus.Subscribe<Step2FinishedMessage>().Wait();

var procId = Guid.NewGuid();
bus.Send(new StartTheSagaMessage() { ProcessId = procId });

和传奇代码:

public class MySaga : Saga<MySagaData>,
    IAmInitiatedBy<StartTheSagaMessage>,
    IHandleMessages<Step1FinishedMessage>,
    IHandleMessages<Step2FinishedMessage>
{
    public IBus Bus { get; set; }

    protected override void CorrelateMessages(ICorrelationConfig<MySagaData> config)
    {
        config.Correlate<StartTheSagaMessage>(m => m.ProcessId, s => s.SagaProcessId);
        config.Correlate<Step1FinishedMessage>(m => m.ProcessId, s => s.SagaProcessId);
        config.Correlate<Step2FinishedMessage>(m => m.ProcessId, s => s.SagaProcessId);
    }

    public async Task Handle(StartTheSagaMessage message)
    {
        if (IsNew == false)
            return;

        Trace.TraceInformation("Mysaga - got StartTheSagaMessage: {0}", message.ProcessId);
        //The saga is started - Do some stuff - call webservices (in external handler)
        //When this step is finished the external process replies with a "step1FinishedMessage"
        this.Data.SagaProcessId = message.ProcessId;
        //Fake Step1FinishMessage (should be replied from external handler)
        await Bus.Send(new Step1FinishedMessage() { ProcessId = this.Data.SagaProcessId });
    }

    public async Task Handle(Step1FinishedMessage message)
    {
        Trace.TraceInformation("Mysaga - got Step1FinishedMessage: {0}", message.ProcessId);
        //Sagabehaviour when the Step1 is finished by the external handler
        this.Data.Step1Finished = true;
        //After dalying 10 seconds - Send a step2finishedmessage
        await Bus.Defer(TimeSpan.FromSeconds(10), new Step2FinishedMessage() { ProcessId = this.Data.SagaProcessId });
    }

    public async Task Handle(Step2FinishedMessage message)
    {
        await Task.Run(() =>
        //return Task.FromResult<void>(() => 
        {
            Trace.TraceInformation("Mysaga - got Step2FinishedMessage: {0}", message.ProcessId);
            //Step2 is handled - finished the saga
            this.Data.Step2Finished = true;
            this.MarkAsComplete();
        });
    }
}

完整示例基于此处提供的解决方案

我做错了什么?

感谢帮助。

4

1 回答 1

1

我改变了 Saga 并且比它有效。

我放:

    private IBus _bus;

    public MySaga(IBus bus)
    {
        _bus = bus;
    }

代替:

    public IBus Bus { get; set; }

然后它起作用了!我不明白为什么,因为调试 Bus 在方法中不为空。

于 2016-03-03T09:08:12.470 回答