1

如标题所示,我想问是否可以在 BeforeTransportMessage 事件中提交事务,例如,如果 TransportMessage 中出现特定标头,我可以将其包装在另一条消息中并发送到其他地方并阻止 Rebus 进一步处理。

我没有找到任何使用该事件的现实生活示例,您是否可以分享一些东西?

提前致谢 :)

4

1 回答 1

1

如果您可以安全地反序列化所有传入消息,那么您可以执行以下操作:

如果您尝试实现一个检查所有传入消息的过滤器,我建议您创建一个“包罗万象”的消息处理程序,即IHandleMessages<object>. 我们就叫它吧CatchAllHandler,虽然我相信你能想出一个更好的名字;)

那么你也能

Configure.With(yourFavoriteContainerAdapter)
    .UseMsmq(...)
    .SpecifyOrderOfHandlers(o => o.First<CatchAllHandler>())
    .Create()
    .Start();

为了CatchAllHandler在任何其他处理程序之前将所有传入消息发送给您。由于它实现IHandleMessage<object>了 ,它能够处理所有传入的消息。然后你CatchAllHandler可以做这样的事情:

public class CatchAllHandler : IHandleMessages<object> {
    readonly IMessageContext context;
    readonly IBus bus;

    public CatchAllHandler(IMessageContext context, IBus bus) {
        this.context = context;
        this.bus = bus;
    }

    public void Handle(object message) {
        if (MessageHasSpecialHeader(context.Headers)) {
            // forward the current transport message in its entirety
            bus.Advanced.Routing.ForwardCurrentMessage("somewhereElse");

            // abort further processing of the message (i.e. do not
            // dispatch to subsequent handlers in the pipeline)
            context.Abort();
        }
    }

    bool MessageHasSpecialHeader(IDictionary<string, object> headers) {
        // ... look for that special header down here
    }
}

但是,如果您不知道传入是否byte[]对您有好处,则需要做其他事情。尽管在没有 Rebus 的其余部分的情况下使用 Rebus 的传输实现是相当简单的,但是您必须自己实现线程模型。

以下代码是一个示例(完全从内存中编写,因此您可能需要更正一些小位),如果您使用 MSMQ,则此类工作线程可能看起来像:

volatile bool _keepWorking = true;

public void ThreadLoop() {
    using(var transport = new MsmqMessageQueue("inputQueueName")) {
        while(_keepWorking) {
            using(var scope = new TransactionScope()) 
            using(var context = new AmbientTransactionContext())
            {
                var message = transport.ReceiveMessage(context);
                if (message == null) {
                    Thread.Sleep(1000); //< don't punish queues too much
                    continue;
                }

                var destination = GetDestinationBasedOnWhatever(message);

                transport.Send(destination, message.ToForwardableMessage());

                scope.Complete();
            }
        }
    }
}

但是,您需要一个单独的队列才能使这种特殊的基于内容的路由器工作。你认为这对你有用吗?

于 2014-12-17T20:42:11.380 回答