使用 Rhino 服务总线。我有一个处理处理的后端应用程序,还有另一个将消息发布到后端的应用程序(客户端 UI)。我在后端有一个 Saga,在此期间我希望 saga 向自己发布消息,将处理分解为多个可以在自己的线程上运行的较小任务。问题是,如果通过 Orchestrates 接口订阅消息,它们总是会被丢弃。我可以使用 ConsumerOf 订阅不同的类,并且消费者会收到消息。
namespace Sagas
{
public class MoveJobSaga: ISaga<MoveJobState>,
InitiatedBy<TriggerMoveJobCommand>,
Orchestrates<TriggerMoveTerminalCommand>
{
private readonly IServiceBus _bus;
private readonly ITerminalFilesService _tfService;
public MoveJobSaga(IServiceBus bus)
{
_bus = bus;
State = new MoveJobState();
}
public MoveJobState State { get; set; }
public Guid Id { get; set; }
public bool IsCompleted { get; set; }
public void Consume(TriggerMoveJobCommand message)
{
State.TerminalsToProcess = message.Terminals.Count();
State.JobId = message.JobId;
foreach (var terminal in message.Terminals)
{
_bus.Publish(new TriggerMoveTerminalCommand()
{
CorrelationId = message.CorrelationId,
Name = terminal.Name
});
}
}
public void Consume(TriggerMoveTerminalCommand message)
{
var result = _tfService.MoveTerminalFiles(message.SourceTifDir, message.TargetTifDir, message.SourceDatDir, message.TargetDatDir);
State.TerminalsProcessed++;
if (State.TerminalsToProcess == State.TerminalsProcessed)
{
_bus.Publish(new MoveJobCompletedEvent()
{
Success = State.Success,
JobId = State.JobId });
}
}
}
public class MoveJobState
{
public MoveJobState()
{
Success = true;
}
public int TerminalsToProcess { get; set; }
public int TerminalsProcessed { get; set; }
public int JobId { get; set; }
public bool Success { get; set; }
}
}
主机配置:
<rhino.esb>
<bus threadCount="1" numberOfRetries="5" endpoint="msmq://localhost/myapp.host" />
<messages />
</rhino.esb>
引导:
public class HostBootStrapper: StructureMapBootStrapper
{
protected override void ConfigureContainer()
{
base.ConfigureContainer();
Container.Configure(sm =>
{
sm.For<ISagaPersister<MoveJobSaga>>().Use<InMemorySagaPersister<MoveJobSaga>>();
sm.Scan(x =>
{
x.TheCallingAssembly();
x.WithDefaultConventions();
});
});
}
}