0

使用 Rhino 服务总线。我有一个处理处理的后端应用程序,还有另一个将消息发布到后端的应用程序(客户端 UI)。我在后端有一个 Saga,在此期间我希望 saga 向自己发布消息,将处理分解为多个可以在自己的线程上运行的较小任务。问题是,如果通过 Orchestrates 接口订阅消息,它们总是会被丢弃。我可以使用 ConsumerOf 订阅不同的类,并且消费者会收到消息。

namespace Sagas
{
public class MoveJobSaga: ISaga<MoveJobState>,
    InitiatedBy<TriggerMoveJobCommand>,
    Orchestrates<TriggerMoveTerminalCommand>
{
    private readonly IServiceBus _bus;
    private readonly ITerminalFilesService _tfService;

    public MoveJobSaga(IServiceBus bus)
    {
        _bus = bus;
        State = new MoveJobState();
    }

    public MoveJobState State { get; set; }
    public Guid Id { get; set; }
    public bool IsCompleted { get; set; }

    public void Consume(TriggerMoveJobCommand message)
    {
        State.TerminalsToProcess = message.Terminals.Count();
        State.JobId = message.JobId;
        foreach (var terminal in message.Terminals)
        {
            _bus.Publish(new TriggerMoveTerminalCommand() 
            { 
                CorrelationId = message.CorrelationId,
                Name = terminal.Name
            });
        }
    }

    public void Consume(TriggerMoveTerminalCommand message)
    {
        var result = _tfService.MoveTerminalFiles(message.SourceTifDir, message.TargetTifDir, message.SourceDatDir, message.TargetDatDir);
        State.TerminalsProcessed++;

        if (State.TerminalsToProcess == State.TerminalsProcessed)
        {
            _bus.Publish(new MoveJobCompletedEvent() 
            { 
               Success = State.Success, 
               JobId = State.JobId });
        }
    }
}

public class MoveJobState
{
    public MoveJobState()
    {
        Success = true;
    }
    public int TerminalsToProcess { get; set; }
    public int TerminalsProcessed { get; set; }
    public int JobId { get; set; }
    public bool Success { get; set; }
}
}

主机配置:

<rhino.esb>
<bus threadCount="1" numberOfRetries="5" endpoint="msmq://localhost/myapp.host" />
<messages />
</rhino.esb>

引导:

public class HostBootStrapper: StructureMapBootStrapper
{
    protected override void ConfigureContainer()
    {
        base.ConfigureContainer();
        Container.Configure(sm =>
        {
            sm.For<ISagaPersister<MoveJobSaga>>().Use<InMemorySagaPersister<MoveJobSaga>>();
            sm.Scan(x =>
            {
                x.TheCallingAssembly();
                x.WithDefaultConventions();
            });
        });
    }
}
4

1 回答 1

0

我需要将 ISagaPersister 注册为单例:

            sm.For<ISagaPersister<MoveJobSaga>>()
                .Singleton()
                .Use<InMemorySagaPersister<MoveJobSaga>>();

此外,我不得不将 saga 使用的所有消息的发布移到 saga 之外。所以我创建了一个服务类来触发启动消息,然后是作业中的每条消息。

于 2014-01-20T19:25:57.950 回答