3

我正在尝试设置 MassTransit 请求/响应方案。问题是消息永远不会到达消费者。我在 PublishRequest 上收到“等待响应超时”错误。日志文件中没有显示其他错误。正在 msmq 中创建消息。

异常信息:

异常类型:TargetInvocationException 异常消息:调用的目标已抛出异常。在 System.RuntimeTypeHandle.CreateInstance(RuntimeType 类型,Boolean publicOnly,Boolean noCheck,Boolean& canBeCached,RuntimeMethodHandleInternal& ctor,Boolean& bNeedSecurityCheck)在 System.RuntimeType.CreateInstanceSlow(Boolean publicOnly,Boolean skipCheckThis,Boolean fillCache,StackCrawlMark 和 stackMark)在 System.RuntimeType.CreateInstanceDefaultCtor (Boolean publicOnly, Boolean skipCheckThis, Boolean fillCache, StackCrawlMark & stackMark)
在 System.Activator.CreateInstance(Type type, Boolean nonPublic) 在 System.Activator.CreateInstance(Type type) 在 System.Web.Mvc.DefaultControllerFactory.DefaultControllerActivator.Create(RequestContext requestContext, Type controllerType)

等待响应超时,RequestId: 08cfa243-4a88-ba3a-20cf-307f54910000 at MassTransit.RequestResponse.RequestImpl 1.Wait() in d:\BuildAgent-03\work\8d1373c869590c5b\src\MassTransit\RequestResponse\RequestImpl.cs:line 124 at MassTransit.RequestResponseExtensions.PublishRequest[TRequest](IServiceBus bus, TRequest message, Action1 configureCallback) in d:\BuildAgent-03\work\8d1373c869590c5b\src\MassTransit\RequestResponseExtensions.cs:line 31 at Producer。 c:\Users\rick\Documents\Visual Studio 2012\Projects\ConsumerTest1\Producer.Website\Controllers\AccountController.cs:line 56 中的 Website.Controllers.AccountController..ctor()

生产者设置:

            _bus = ServiceBusFactory.New(sbc =>
            {
                sbc.UseMsmq();
                sbc.VerifyMsmqConfiguration();

                sbc.UseMulticastSubscriptionClient();
                sbc.SetNetwork("Test");

               sbc.ReceiveFrom("msmq://localhost/consumer_test_1");

            });

生产者发送消息:

                var message = new AccountNewMessage()
                {
                    CorrelationId = CombGuid.Generate(),
                    UserName = “blah blah”,
                    Password = “yada yada”
                };

                this._bus.PublishRequest(message, r =>
                    {
                        r.SetTimeout(30.Seconds());

                        r.Handle<AccountNewMessageResponse>(m =>
                            {
                                var response = m;
                            });
                    });

消费者设置:

        this.bus = ServiceBusFactory.New(sbc =>
            {
                sbc.UseMsmq();
                sbc.VerifyMsmqConfiguration();

                sbc.UseMulticastSubscriptionClient();
                sbc.SetNetwork("Test");

                sbc.ReceiveFrom("msmq://localhost/consumer_test_2");

                sbc.Subscribe(subs => subs.Instance(new AccountNewMessageConsumer()));
            });

消费者:

public class AccountNewMessageConsumer : Consumes<AccountNewMessage>.Context
{
    public void Consume(IConsumeContext<AccountNewMessage> context)
    {
        context.Respond(new AccountNewMessageResponse()
        {
            CorrelationId = context.Message.CorrelationId,
            ErrorCode = "1",
            UserId = new Random().Next(1, 10000).ToString()
        });
    }
}

留言:

[Serializable]
public class AccountNewMessage : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; set; }
    public string UserName { get; set; }
    public string Password { get; set; }
}

[Serializable]
public class AccountNewMessageResponse : CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; set; }
    public string UserId { get; set; }
    public string ErrorCode { get; set; }
}

我究竟做错了什么?谢谢你。

4

1 回答 1

0

您必须告诉 MassTransit 如何存储它的订阅信息(对于 MSMQ)。

有两种选择:MSMQ Multicast,将订阅信息保存在内存中,并将MSMQ Runtime Services订阅信息存储在数据库中,因此它在会话之间保持不变。

您选择使用哪个取决于您是需要永久订阅还是临时订阅 - 来自文档

永久订阅表示您希望保留的订阅,即使您的进程已关闭(也许您正在进行升级并且不想错过任何消息)。如果您不在乎关闭时是否错过消息,则可以使用临时订阅。

没有理由在开发/PoC 期间不能使用 Multicast 并稍后切换到 RuntimeServices。文档页面还显示了如何在配置中设置每个,当然使用 RuntimeServices 您还必须设置数据库。

(请注意,多播可能需要一段时间才能自行设置,因此您可能需要暂停测试系统以预热,然后再开始通过它发送消息。)

编辑:我添加了几个验证订阅和消费者的例程:您将在设置订阅时调用第一个(即在 sbc.Subscribe 期间),在配置总线后调用第二个。也许他们会帮助找到问题?

private void ValidateSubscriptions(Configurator configurator)
{
    var errors = configurator.Validate();

    Console.WriteLine("Subscription Validation");
    Console.WriteLine("-----------------------");

    foreach (var err in errors.Where(e => string.IsNullOrEmpty(e.Value) == false))
    {
        Console.WriteLine("Type: {0} Message: {1} Key: {2} Value: {3}",
            err.Disposition, err.Message, err.Key, err.Value);
    }
}

private static void ValidateBus(Configurator bus)
{
    var errors = bus.Validate();

    Console.WriteLine("Consumer Validation");
    Console.WriteLine("-------------------");

    foreach (var err in errors.Where(e => string.IsNullOrEmpty(e.Value) == false))
    {
        Console.WriteLine("Type: {0} Message: {1} Key: {2} Value: {3}",
            err.Disposition, err.Message, err.Key, err.Value);
    }
}    
于 2012-12-06T17:13:41.983 回答