我正在将 MassTransit 视为要在 Web 项目中使用的 ServiceBus 实现。
我正在使用请求/响应模式,并且看到消费者接收消息和响应与请求发布者处理响应之间存在很长的延迟;有时,似乎响应永远不会通过(让它运行了 10 分钟,响应仍然没有通过)。我看到句柄委托被调用并响应的唯一时间是在 30 秒超时时间之后并抛出超时异常;在这种情况下,处理程序委托上设置的断点被命中。
设置是标准事务 - 我有一个正在发布请求的 Web 应用程序,一个正在使用请求和发送响应的控制台应用程序,以便 Web 应用程序处理回调中的响应。
我正在使用 Castle Windsor,并且容器在 Web 项目中使用 WebActivator 进行了初始化:
[assembly: WebActivator.PreApplicationStartMethod(typeof(BootStrapper), "PreStart")]
[assembly: WebActivator.PostApplicationStartMethod(typeof(BootStrapper), "PostStart")]
[assembly: WebActivator.ApplicationShutdownMethodAttribute(typeof(BootStrapper), "Stop")]
namespace Web.App_Start
{
public static class BootStrapper
{
internal static IWindsorContainer Container { get; private set; }
public static void PreStart()
{
Container = new WindsorContainer().Install(FromAssembly.This());
}
public static void PostStart()
{
FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
RouteConfig.RegisterRoutes(RouteTable.Routes);
BundleConfig.RegisterBundles(BundleTable.Bundles);
ApiConfig.Configure(Container);
MvcConfig.Configure(Container);
}
public static void Stop()
{
if (Container != null)
Container.Dispose();
}
}
}
在 Web 应用程序项目(一个 ASP.NET Web API 项目)中,MassTransit 的 WindsorInstaller 看起来像
public class MassTransitInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
container.Register(AllTypes.FromThisAssembly().BasedOn<IConsumer>());
var bus = ServiceBusFactory.New(configurator =>
{
configurator.UseMsmq();
configurator.VerifyMsmqConfiguration();
configurator.UseMulticastSubscriptionClient();
configurator.ReceiveFrom("msmq://localhost/web");
configurator.EnableMessageTracing();
configurator.Subscribe(x => x.LoadFrom(container));
});
container.Register(Component.For<IServiceBus>().Instance(bus));
}
}
在控制台应用程序项目中,WindsorInstaller 看起来像
public class MassTransitInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
container.Register(AllTypes.FromAssemblyContaining<BasicRequestCommandHandler>().BasedOn<IConsumer>());
var bus = ServiceBusFactory.New(configurator =>
{
configurator.UseMsmq();
configurator.VerifyMsmqConfiguration();
configurator.UseMulticastSubscriptionClient();
configurator.ReceiveFrom("msmq://localhost/console");
configurator.Subscribe(x => x.LoadFrom(container));
});
container.Register(Component.For<IServiceBus>().Instance(bus));
}
}
我有一个ApiController
使用以下 GET 操作方法
public class ExampleController : ApiController
{
private readonly IServiceBus _bus;
public HelloController(IServiceBus bus)
{
_bus = bus;
}
// GET api/hello?text={some text}
public Task<IBasicResponseCommand> Get(string text)
{
var command = new BasicRequestCommand {Text = text};
var tcs = new TaskCompletionSource<IBasicResponseCommand>();
_bus.PublishRequest(command, c =>
{
c.Handle<IBasicResponseCommand>(r =>
{
tcs.SetResult(r);
});
});
return tcs.Task;
}
}
BasicRequestCommand 和 BasicResponseCommand 看起来像这样
public interface IBasicRequestCommand
{
Guid CorrelationId { get; set; }
string Text { get; set; }
}
public class BasicRequestCommand :
CorrelatedBy<Guid>, IBasicRequestCommand
{
public Guid CorrelationId { get; set; }
public string Text { get; set; }
public BasicRequestCommand()
{
CorrelationId = Guid.NewGuid();
}
}
public interface IBasicResponseCommand
{
Guid CorrelationId { get; set; }
string Text { get; set; }
}
public class BasicResponseCommand :
CorrelatedBy<Guid>, IBasicResponseCommand
{
public Guid CorrelationId { get; set; }
public string Text { get; set; }
}
以及响应控制台应用程序中的 BasicRequestCommand 的处理程序:
public class BasicRequestCommandHandler : Consumes<IBasicRequestCommand>.Context
{
public void Consume(IConsumeContext<IBasicRequestCommand> context)
{
Console.Out.WriteLine("received message text " + context.Message.Text);
context.Respond(new BasicResponseCommand { Text = "Hello " + context.Message.Text, CorrelationId = context.Message.CorrelationId });
}
}
我预计所有这些都在本地运行,请求/响应最多大约几秒钟。我在配置中遗漏了什么吗?
此外,我想将 MassTransit 连接到 log4net。我正在使用 Windsor 的 log4net 日志记录工具,并且在 web.config 中有一个 log4net 部分。这对于ILogger
Windsor 提供的实现(以及 NHibernate 日志记录)都可以正常工作,但是从文档中不清楚如何配置 MassTransit 以将其用于日志记录。有任何想法吗?