如标题所示,我想问是否可以在 BeforeTransportMessage 事件中提交事务,例如,如果 TransportMessage 中出现特定标头,我可以将其包装在另一条消息中并发送到其他地方并阻止 Rebus 进一步处理。
我没有找到任何使用该事件的现实生活示例,您是否可以分享一些东西?
提前致谢 :)
如标题所示,我想问是否可以在 BeforeTransportMessage 事件中提交事务,例如,如果 TransportMessage 中出现特定标头,我可以将其包装在另一条消息中并发送到其他地方并阻止 Rebus 进一步处理。
我没有找到任何使用该事件的现实生活示例,您是否可以分享一些东西?
提前致谢 :)
如果您可以安全地反序列化所有传入消息,那么您可以执行以下操作:
如果您尝试实现一个检查所有传入消息的过滤器,我建议您创建一个“包罗万象”的消息处理程序,即IHandleMessages<object>
. 我们就叫它吧CatchAllHandler
,虽然我相信你能想出一个更好的名字;)
那么你也能
Configure.With(yourFavoriteContainerAdapter)
.UseMsmq(...)
.SpecifyOrderOfHandlers(o => o.First<CatchAllHandler>())
.Create()
.Start();
为了CatchAllHandler
在任何其他处理程序之前将所有传入消息发送给您。由于它实现IHandleMessage<object>
了 ,它能够处理所有传入的消息。然后你CatchAllHandler
可以做这样的事情:
public class CatchAllHandler : IHandleMessages<object> {
readonly IMessageContext context;
readonly IBus bus;
public CatchAllHandler(IMessageContext context, IBus bus) {
this.context = context;
this.bus = bus;
}
public void Handle(object message) {
if (MessageHasSpecialHeader(context.Headers)) {
// forward the current transport message in its entirety
bus.Advanced.Routing.ForwardCurrentMessage("somewhereElse");
// abort further processing of the message (i.e. do not
// dispatch to subsequent handlers in the pipeline)
context.Abort();
}
}
bool MessageHasSpecialHeader(IDictionary<string, object> headers) {
// ... look for that special header down here
}
}
但是,如果您不知道传入是否byte[]
对您有好处,则需要做其他事情。尽管在没有 Rebus 的其余部分的情况下使用 Rebus 的传输实现是相当简单的,但是您必须自己实现线程模型。
以下代码是一个示例(完全从内存中编写,因此您可能需要更正一些小位),如果您使用 MSMQ,则此类工作线程可能看起来像:
volatile bool _keepWorking = true;
public void ThreadLoop() {
using(var transport = new MsmqMessageQueue("inputQueueName")) {
while(_keepWorking) {
using(var scope = new TransactionScope())
using(var context = new AmbientTransactionContext())
{
var message = transport.ReceiveMessage(context);
if (message == null) {
Thread.Sleep(1000); //< don't punish queues too much
continue;
}
var destination = GetDestinationBasedOnWhatever(message);
transport.Send(destination, message.ToForwardableMessage());
scope.Complete();
}
}
}
}
但是,您需要一个单独的队列才能使这种特殊的基于内容的路由器工作。你认为这对你有用吗?