我尝试使用持久化 Saga 实例RedisSagaRepository
;我想在负载平衡设置中运行 Saga,所以我不能使用InMemorySagaRepository
. 然而,在我切换之后,我注意到消费者发布的一些事件没有被 Saga 处理。我检查了队列,没有看到任何消息。
我注意到,当消费者几乎没有时间处理命令和发布事件时,它可能会发生。InMemorySagaRepository
如果我使用或添加将不会出现此Task.Delay()
问题Consumer.Consume()
我使用不正确吗?
另外,如果我想在负载平衡设置中运行 Saga,并且如果 Saga 需要使用字典发送多个相同类型的命令来跟踪完整性(类似于处理多个事件的状态转换中的逻辑)。当多个消费者同时发布事件时,如果两个 Sagas 同时处理两个不同的事件,我会出现竞争条件吗?在这种情况下,State 对象中的 Dictionary 是否会正确设置?
代码可在此处获得
SagaService.ConfigureSagaEndPoint()
InMemorySagaRepository
是我在和之间切换的地方RedisSagaRepository
private void ConfigureSagaEndPoint(IRabbitMqReceiveEndpointConfigurator endpointConfigurator)
{
var stateMachine = new MySagaStateMachine();
try
{
var redisConnectionString = "192.168.99.100:6379";
var redis = ConnectionMultiplexer.Connect(redisConnectionString);
///If we switch to RedisSagaRepository and Consumer publish its response too quick,
///It seems like the consumer published event reached Saga instance before the state is updated
///When it happened, Saga will not process the response event because it is not in the "Processing" state
//var repository = new RedisSagaRepository<SagaState>(() => redis.GetDatabase());
var repository = new InMemorySagaRepository<SagaState>();
endpointConfigurator.StateMachineSaga(stateMachine, repository);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
LeafConsumer.Consume 是我们添加 Task.Delay() 的地方
public class LeafConsumer : IConsumer<IConsumerRequest>
{
public async Task Consume(ConsumeContext<IConsumerRequest> context)
{
///If MySaga project is using RedisSagaRepository, uncomment await Task.Delay() below
///Otherwise, it seems that the Publish message from Consumer will not be processed
///If using InMemorySagaRepository, code will work without needing Task.Delay
///Maybe I am doing something wrong here with these projects
///Or in real life, we probably have code in Consumer that will take a few milliseconds to complete
///However, we cannot predict latency between Saga and Redis
//await Task.Delay(1000);
Console.WriteLine($"Consuming CorrelationId = {context.Message.CorrelationId}");
await context.Publish<IConsumerProcessed>(new
{
context.Message.CorrelationId,
});
}
}