几天来,我一直在与这个问题作斗争,但找不到真正有效的方法。
我的场景是我有一个状态机 saga,我想在运行 saga 的同一个事务(实体框架)中做一些事情,这样状态和业务就可以在一起了。
现在,我知道状态机本身不应该有任何依赖关系,因此Activity(x => x.OfInstanceType<MyActivity>)
可以添加这个活动,它是从 DI 容器解析的,并且可以在其中包含任何依赖项(服务等)。到目前为止,一切都很好...
我的问题是,无论我在活动中执行什么,我都无法让它正常工作。它从容器中解析并Execute
调用该方法,但随后它以某种方式退出。
似乎抛出了一些异常,但这显然不会冒泡到测试工具中。
我正在使用微软的 dotnet core Dependency Injection 库。
这是一些代码
public class MyStateMachine : MassTransitStateMachine<SagaInstance>
{
public MyStateMachine()
{
InstanceState(instance => instance.CurrentState);
Event(() => Start, x => x.CorrelateBy(saga => saga.CorrelationId, context => context.Message.CorrelationId));
Initially(
When(Start)
.Activity(c => c.OfType<MyActivity>())
.TransitionTo(Running)
.Publish(new Started())
);
}
Event<Run> Start { get; set;}
State Running { get; set; }
}
public class MyActivity : Activity<SagaInstance, Run>
{
private readonly IMyService _service;
public MyActivity(IMyService service)
{
_service = service;
}
public async Task Execute(BehaviorContext<SagaInstance, Run> context, Behavior<SagaInstance, Run> next)
{
//throw new Exception("BOO"); // uncommenting this line doesn't throw anywhere
_service.DoThatThing();
await next.Execute(context).ConfigureAwait(false);
}
}
[Fact]
public async Task RunMessageSent_NonexistingSaga_StateIsRunning()
{
var harness = new InMemoryTestHarness();
var machine = new MyStateMachine();
var collection = new ServiceCollection();
collection.AddMassTransit();
collection.RegisterInMemorySagaRepository<SagaInstance>();
collection.RegisterSagaStateMachine<MyStateMachine, SagaInstance>();
collection.AddScoped<MyActivity>();
collection.AddScoped<MyService>();
var provider = collection.BuildServiceProvider();
harness.OnConfigureInMemoryReceiveEndpoint += cfg => cfg.StateMachineSaga(machine, provider);
await harness.Start();
try
{
var guid = CorrelationId = Guid.NewGuid();
await harness.InputQueueSendEndpoint.Send(new Run({CorrelationId = guid}));
var repo = provider.GetService<ISagaRepository<SagaInstance>>() as InMemorySagaRepository<SagaInstance>;
var saga = await repo.ShouldContainSaga(s => s.Serial == serial, timeout: TimeSpan.FromSeconds(1));
Assert.NotNull(saga);
// this Equal fails saying the saga is in state Initial
Assert.Equal(machine.Running.Name, repo[saga.Value].Instance.CurrentState);
}
finally
{
await harness.Stop();
}
}