2

几天来,我一直在与这个问题作斗争,但找不到真正有效的方法。

我的场景是我有一个状态机 saga,我想在运行 saga 的同一个事务(实体框架)中做一些事情,这样状态和业务就可以在一起了。

现在,我知道状态机本身不应该有任何依赖关系,因此Activity(x => x.OfInstanceType<MyActivity>)可以添加这个活动,它是从 DI 容器解析的,并且可以在其中包含任何依赖项(服务等)。到目前为止,一切都很好...

我的问题是,无论我在活动中执行什么,我都无法让它正常工作。它从容器中解析并Execute调用该方法,但随后它以某种方式退出。
似乎抛出了一些异常,但这显然不会冒泡到测试工具中。

我正在使用微软的 dotnet core Dependency Injection 库。

这是一些代码

public class MyStateMachine : MassTransitStateMachine<SagaInstance>
{
    public MyStateMachine()
    {
        InstanceState(instance => instance.CurrentState);

        Event(() => Start, x => x.CorrelateBy(saga => saga.CorrelationId, context => context.Message.CorrelationId));

        Initially(
            When(Start)
                .Activity(c => c.OfType<MyActivity>())
                .TransitionTo(Running)
                .Publish(new Started())
        );
    }   

    Event<Run> Start { get; set;} 
    State Running { get; set; }
}

public class MyActivity : Activity<SagaInstance, Run>
{
    private readonly IMyService _service;
    public MyActivity(IMyService service)
    {
        _service = service;
    }

    public async Task Execute(BehaviorContext<SagaInstance, Run> context, Behavior<SagaInstance, Run> next)
    {   
        //throw new Exception("BOO"); // uncommenting this line doesn't throw anywhere
        _service.DoThatThing();
        await next.Execute(context).ConfigureAwait(false);
    }
}

[Fact]
public async Task RunMessageSent_NonexistingSaga_StateIsRunning()
{
    var harness = new InMemoryTestHarness();
    var machine = new MyStateMachine();

    var collection = new ServiceCollection();
    collection.AddMassTransit();
    collection.RegisterInMemorySagaRepository<SagaInstance>();
    collection.RegisterSagaStateMachine<MyStateMachine, SagaInstance>();
    collection.AddScoped<MyActivity>();
    collection.AddScoped<MyService>();

    var provider = collection.BuildServiceProvider();

    harness.OnConfigureInMemoryReceiveEndpoint += cfg => cfg.StateMachineSaga(machine, provider);

    await harness.Start();

    try
    {
        var guid = CorrelationId = Guid.NewGuid();
        await harness.InputQueueSendEndpoint.Send(new Run({CorrelationId = guid}));

        var repo = provider.GetService<ISagaRepository<SagaInstance>>() as InMemorySagaRepository<SagaInstance>;
        var saga = await repo.ShouldContainSaga(s => s.Serial == serial, timeout: TimeSpan.FromSeconds(1));

        Assert.NotNull(saga);
        // this Equal fails saying the saga is in state Initial
        Assert.Equal(machine.Running.Name, repo[saga.Value].Instance.CurrentState);
    }
    finally
    {
        await harness.Stop();
    }   
}
4

2 回答 2

1

我在 dotnet core 2.1 上运行,经过大量谷歌搜索和浏览 Masstransit 存储库后,我意识到在最新的 (5.3) Masstransit 中有更好的依赖注入框架 API,我一开始不想升级到以避免将 dotnet 运行时依赖项也升级到 2.2。

完成升级后,我开始意识到一定有一些异步问题,因为我有时让它随机工作。然后我意识到 in mem repo 上还有另一种方法,称为ShouldContainSagaInState.

所以通过使用这条线,我得到了它的工作!

var saga = await repo.ShouldContainSagaInState(guid, machine, x => x.Running, TimeSpan.FromSeconds(1));

同样按照我打算的方式运行代码,使用请求/响应模式,潜在的异常作为故障消息发送回调用者。在测试工具中,这条消息以某种方式可用,但我还没有找到合适的方法来检查它。

于 2019-05-28T09:27:22.380 回答
0

我创建了一个简单的活动,在 IOC 中注册它,像你一样使用,但有异常:

GreenPipes.PayloadNotFoundException: The payload was not found: Automatonymous.IStateMachineActivityFactory
   at GreenPipes.PipeExtensions.GetPayload[TPayload](PipeContext context)
   at Automatonymous.Activities.ContainerFactoryActivity`3.Automatonymous.Activity<TInstance,TData>.Execute(BehaviorContext`2 context, Behavior`2 next)
   at Automatonymous.Activities.DataConverterActivity`2.Automatonymous.Activity<TInstance>.Execute[T](BehaviorContext`2 context, Behavior`2 next)
   at Automatonymous.Behaviors.LastBehavior`1.Automatonymous.Behavior<TInstance>.Execute[T](BehaviorContext`2 context)
   at Automatonymous.States.StateMachineState`1.Automatonymous.State<TInstance>.Raise[T](EventContext`2 context)
   at Automatonymous.States.StateMachineState`1.Automatonymous.State<TInstance>.Raise[T](EventContext`2 context)
   at Automatonymous.AutomatonymousStateMachine`1.Automatonymous.StateMachine<TInstance>.RaiseEvent[T](EventContext`2 context)
   at Automatonymous.Pipeline.StateMachineSagaMessageFilter`2.Send(SagaConsumeContext`2 context, IPipe`1 next)
   at MassTransit.EntityFrameworkCoreIntegration.Saga.EntityFrameworkSagaRepository`1.MassTransit.Saga.ISagaRepository<TSaga>.Send[T](ConsumeContext`1 context, ISagaPolicy`2 policy, IPipe`1 next)

我使用 Masstransit 5.1.5 有什么问题?

于 2019-10-06T19:27:42.740 回答