4

我正在将 MassTransit 视为要在 Web 项目中使用的 ServiceBus 实现。

我正在使用请求/响应模式,并且看到消费者接收消息和响应与请求发布者处理响应之间存在很长的延迟;有时,似乎响应永远不会通过(让它运行了 10 分钟,响应仍然没有通过)。我看到句柄委托被调用并响应的唯一时间是在 30 秒超时时间之后并抛出超时异常;在这种情况下,处理程序委托上设置的断点被命中。

设置是标准事务 - 我有一个正在发布请求的 Web 应用程序,一个正在使用请求和发送响应的控制台应用程序,以便 Web 应用程序处理回调中的响应。

我正在使用 Castle Windsor,并且容器在 Web 项目中使用 WebActivator 进行了初始化:

[assembly: WebActivator.PreApplicationStartMethod(typeof(BootStrapper), "PreStart")]
[assembly: WebActivator.PostApplicationStartMethod(typeof(BootStrapper), "PostStart")]
[assembly: WebActivator.ApplicationShutdownMethodAttribute(typeof(BootStrapper), "Stop")]

namespace Web.App_Start
{
    public static class BootStrapper
    {
        internal static IWindsorContainer Container { get; private set; }

        public static void PreStart()
        {
            Container = new WindsorContainer().Install(FromAssembly.This());
        }

        public static void PostStart()
        {
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            BundleConfig.RegisterBundles(BundleTable.Bundles);

            ApiConfig.Configure(Container);
            MvcConfig.Configure(Container);
        }

        public static void Stop()
        {
            if (Container != null)
                Container.Dispose();
        }
    }
}

在 Web 应用程序项目(一个 ASP.NET Web API 项目)中,MassTransit 的 WindsorInstaller 看起来像

public class MassTransitInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        container.Register(AllTypes.FromThisAssembly().BasedOn<IConsumer>());

        var bus = ServiceBusFactory.New(configurator =>
        {
            configurator.UseMsmq();
            configurator.VerifyMsmqConfiguration();            
            configurator.UseMulticastSubscriptionClient();

            configurator.ReceiveFrom("msmq://localhost/web");

            configurator.EnableMessageTracing();
            configurator.Subscribe(x => x.LoadFrom(container));
        });

        container.Register(Component.For<IServiceBus>().Instance(bus));
    }
}

在控制台应用程序项目中,WindsorInstaller 看起来像

public class MassTransitInstaller : IWindsorInstaller
{
    public void Install(IWindsorContainer container, IConfigurationStore store)
    {
        container.Register(AllTypes.FromAssemblyContaining<BasicRequestCommandHandler>().BasedOn<IConsumer>());

        var bus = ServiceBusFactory.New(configurator =>
        {
            configurator.UseMsmq();
            configurator.VerifyMsmqConfiguration();
            configurator.UseMulticastSubscriptionClient();

            configurator.ReceiveFrom("msmq://localhost/console");

            configurator.Subscribe(x => x.LoadFrom(container));
        });

        container.Register(Component.For<IServiceBus>().Instance(bus));
    }
}

我有一个ApiController使用以下 GET 操作方法

public class ExampleController : ApiController
{
    private readonly IServiceBus _bus;

    public HelloController(IServiceBus bus)
    {
        _bus = bus;
    }

    // GET api/hello?text={some text}
    public Task<IBasicResponseCommand> Get(string text)
    {
        var command = new BasicRequestCommand {Text = text};

        var tcs = new TaskCompletionSource<IBasicResponseCommand>();

        _bus.PublishRequest(command, c =>
        {
            c.Handle<IBasicResponseCommand>(r =>
            {
                tcs.SetResult(r);
            });
        });

        return tcs.Task;
    }
}

BasicRequestCommand 和 BasicResponseCommand 看起来像这样

public interface IBasicRequestCommand
{
    Guid CorrelationId { get; set; }
    string Text { get; set; }
}

public class BasicRequestCommand :
    CorrelatedBy<Guid>, IBasicRequestCommand
{
    public Guid CorrelationId { get; set; }
    public string Text { get; set; }

    public BasicRequestCommand()
    {
        CorrelationId = Guid.NewGuid();
    }
}

public interface IBasicResponseCommand
{
    Guid CorrelationId { get; set; }
    string Text { get; set; }
}

public class BasicResponseCommand :
    CorrelatedBy<Guid>, IBasicResponseCommand
{
    public Guid CorrelationId { get; set; }
    public string Text { get; set; }
}

以及响应控制台应用程序中的 BasicRequestCommand 的处理程序:

public class BasicRequestCommandHandler : Consumes<IBasicRequestCommand>.Context
{
    public void Consume(IConsumeContext<IBasicRequestCommand> context)
    {
        Console.Out.WriteLine("received message text " + context.Message.Text);

        context.Respond(new BasicResponseCommand { Text = "Hello " + context.Message.Text, CorrelationId = context.Message.CorrelationId });
    }
}

我预计所有这些都在本地运行,请求/响应最多大约几秒钟。我在配置中遗漏了什么吗?

此外,我想将 MassTransit 连接到 log4net。我正在使用 Windsor 的 log4net 日志记录工具,并且在 web.config 中有一个 log4net 部分。这对于ILoggerWindsor 提供的实现(以及 NHibernate 日志记录)都可以正常工作,但是从文档中不清楚如何配置 MassTransit 以将其用于日志记录。有任何想法吗?

4

3 回答 3

4

正如 Andrei Volkov 和 Chris Patterson在 MassTransit google group 上讨论​​的那样,这个问题似乎源于将 MassTransit切换到 using SynchronizationContext,由于某种原因无法按预期工作。

目前,一种解决方法似乎是转换到异步 MassTransit 请求,或者返回到不使用违规的 v2.1.1 SynchronizationContext

(如果没有其他人先这样做,将在此处发布有关此问题的更新以供后代使用。)

于 2012-08-30T17:15:51.143 回答
3

ASP.NET 中请求/响应的响应超时问题已在 2.6.2 版本中修复。 https://groups.google.com/d/topic/masstransit-discuss/oC1FOe6KsAU/discussion

于 2012-09-14T21:30:23.983 回答
1

当您使用 MultiCastSubscriptionClient 时,您必须SetNetwork(NETWORK_KEY)在每台机器上调用(对 NETWORK_KEY 使用相同的值)。此外,所有参与的机器都需要在同一个子网上 - 请参阅http://masstransit.readthedocs.org/en/latest/overview/subscriptions.html#msmq-multicast上的文档

对于连接 log4net,这取决于您使用的版本,但在最新版本中,您包含 MassTransit.Log4NetIntegration 程序集,然后调用cfg.UseLog4Net();您的服务总线配置。

如果您仍然卡住,您可以在https://groups.google.com/forum/?fromgroups#!forum/masstransit-discuss询问 MT 邮件列表

于 2012-08-15T16:29:05.020 回答