我正在将 Mass Transit 与 RabbitMq 和 Automatonymous 一起用于 asp.net core 2.1 应用程序的概念证明。我将 EntityFramework 核心与 Postgres 一起使用以实现持久性。
我要做的是在向 http rest api 发出请求时启动一个 saga,并在 saga 完成后返回结果。我正在做的是:
- 使用具有请求/响应客户端的接口连接一个事件以启动我的传奇
- 在传奇中发布一条由消费者消费的消息
- 在消费者中发布与我的传奇中的另一个事件相对应的消息
- 完成后从我的传奇中返回响应并最终确定
这是我的代码:
我的界面
public interface IStartSagaRequest
{
Guid CorrelationId { get; set; }
string Name {get; set;}
}
public interface IStartSagaResponse
{
Guid CorrelationId { get; set; }
bool DidComplete {get; set;}
}
public IDoOperationRequest
{
Guid CorrelationId { get; set; }
}
public IOperationComplete
{
Guid CorrelationId { get; set; }
bool OperationSuccessful {get; set;}
}
我的传奇实例
public class DoOperationSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public Name { get; set; }
public string CurrentState { get; set; }
}
用于在状态机中发布的 IDoOperationRequest 的具体实现
public class DoOperationRequestImpl : IDoOperationRequest
{
public Guid CorrelationId { get; set; }
}
用于在状态机中发布的 IStartSagaResponse 的具体实现
public class StartSagaResponse : IStartSagaResponse
{
public Guid CorrelationId { get; set; }
public bool DidComplete {get; set;}
}
我的状态机
public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
public State OperationPending { get; private set; }
public State Complete { get; private set; }
public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }
public ProcessOperationStateMachine()
{
InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
});
Event(() => OperationCompleteEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId);
});
Initially(
When(StartSagaRequestEvent)
.Then(context =>
{
context.Instance.CorrelationId = context.Data.CorrelationId;
context.Instance.Name = context.Data.Name;
context.Publish(new DoOperationRequestImpl
{
CorrelationId = context.Data.CorrelationId
});
})
.TransitionTo(OperationPending)
);
During(OperationPending,
When(OperationCompleteEvent)
.Then(context =>
{
// I'm just doing this for debugging
context.Instance.Name = "changed in operationComplete";
})
.ThenAsync(context => context.RespondAsync(new StartSagaResponse
{
CorrelationId = context.Data.CorrelationId,
DidComplete = true
}))
.Finalize());
}
我的消费者:
public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
await context.Publish<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
}
我如何在 Startup.cs 中的 DI 中连接事物
public void ConfigureServices(IServiceCollection services)
{
stateMachine = new ProcessOperationStateMachine();
SagaDbContextFactory factory = new SagaDbContextFactory();
EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
{
IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "do-operation", ep =>
{
ep.UseMessageRetry(c => c.Interval(2, 100));
ep.StateMachineSaga(stateMachine, repository);
ep.Durable = false;
});
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.Consumer(() => new DoOperationRequestConsumer());
ep.Durable = false;
});
}));
x.AddConsumer<DoOperationRequestConsumer>();
});
services.AddScoped<DoOperationRequestConsumer>();
services.AddScoped(p =>
p.GetRequiredService<IBusControl>()
.CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
new Uri("rabbitmq://localhost/do-operation?durable=false"),
TimeSpan.FromSeconds(30)));
}
并在我的控制器中发出请求:
public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
Name = "from the controller",
CorrelationId = guid
});
我看到的是我的状态机确实启动了。When(StartSagaRequestEvent) 确实被击中并发布了 DoOperationRequest 消息。DoOperationRequestConsumer 确实收到消息并发布 IOperationComplete 消息。然而,这就是它停止的地方。我的状态机中的 IOperationCompleteEvent 没有被调用。当我查看数据库时,我可以看到我的 saga 实例是使用 guid 创建的,并且 CurrentState 设置为 OperationPending。当我查看我的 rabbitmq 管理仪表板时,我看到在 DoOperationRequestConsumer 执行 IOperationComplete 消息发布之后发布了一条消息。我只是没有看到状态机使用消费者发布的 IOperationComplete 消息。
我还尝试在消费者中显式使用“执行操作”队列:
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));
await sendEndpoint.Send<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
但仍然无法建立连接。
我整天都在努力解决这个问题,不确定我在这里错过了什么。如果有人可以就我可能做错的事情给我一些建议,我将非常感激,再次为文字墙感到抱歉,我知道它可以阅读,但我想清楚我在做什么。非常感谢!