9

嗨,我正在尝试通过 EasyNetQ 通过 RabbitMQ 发送一个简单的对象。我在订阅端反序列化该对象时遇到问题。任何人都可以向我展示这是如何工作的样本。请记住,要发送的对象是在它自己的项目中定义的,而不是在发布者和订阅者之间共享。这是我的示例,也许您可​​以告诉我它有什么问题?

方案一:

class ProgramA
{
    static void Main(string[] args)
    {
        using (var bus = RabbitHutch.CreateBus("host=localhost"))
        {
            Console.WriteLine("Press any key to send the message");
            Console.ReadKey();
            bus.Publish(new MessageA { Text = "Hello World" });
            Console.WriteLine("Press any key to quit");
            Console.ReadKey();
        }
    }

    public class MessageA
    {
        public string Text { get; set; }
    }
}

方案 B:

class ProgramB
{
    static void Main(string[] args)
    {
        using (var bus = RabbitHutch.CreateBus("host=localhost"))
        {
            bus.Subscribe<MessageB>("", HandleClusterNodes);
            Console.WriteLine("Press any key to quit");
            Console.ReadKey();
        }
    }

    private static void HandleClusterNodes(MessageB obj)
    {
        Console.WriteLine(obj.Text);
    }

    [Queue("TestMessagesQueue", ExchangeName = "EasyNetQSample.ProgramA+MessageA:EasyNetQSample")]
    public class MessageB
    {
        public string Text { get; set; }
    }
}

这是我收到的错误:

DEBUG: HandleBasicDeliver on consumer: f9ded52d-039c-411a-9b9f-5c8ee3301854, deliveryTag: 1
DEBUG: Received
        RoutingKey: ''
        CorrelationId: 'ec41faea-a0c8-4ffd-8163-2cbf85d45fcd'
        ConsumerTag: 'f9ded52d-039c-411a-9b9f-5c8ee3301854'
        DeliveryTag: 1
        Redelivered: False
ERROR: Exception thrown by subscription callback.
        Exchange:    'EasyNetQSample.ProgramA+MessageA:EasyNetQSample'
        Routing Key: ''
        Redelivered: 'False'
Message:
{"Text":"Hello World"}
BasicProperties:
ContentType=NULL, ContentEncoding=NULL, Headers=[], DeliveryMode=2, Priority=0, CorrelationId=ec41faea-a0c8-4ffd-8163-2cbf85d45fcd, ReplyTo=NULL, Expiration=NULL, MessageId=NULL, Timestamp=0, Type=EasyNetQSample.ProgramA+MessageA:EasyNetQSample, UserId=NULL, AppId=NULL, ClusterId=NULL
Exception:
System.AggregateException: One or more errors occurred. ---> EasyNetQ.EasyNetQException: Cannot find type EasyNetQSample.ProgramA+MessageA:EasyNetQSample
   at EasyNetQ.TypeNameSerializer.DeSerialize(String typeName)
   at EasyNetQ.DefaultMessageSerializationStrategy.DeserializeMessage(MessageProperties properties, Byte[] body)
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass19.<Consume>b__18(Byte[] body, MessageProperties properties, MessageReceivedInfo messageReceivedInfo)
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1e.<Consume>b__1d(Byte[] body, MessageProperties properties, MessageReceivedInfo receviedInfo)
   at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandler(ConsumerExecutionContext context)
   --- End of inner exception stack trace ---
---> (Inner Exception #0) EasyNetQ.EasyNetQException: Cannot find type EasyNetQSample.ProgramA+MessageA:EasyNetQSample
   at EasyNetQ.TypeNameSerializer.DeSerialize(String typeName)
   at EasyNetQ.DefaultMessageSerializationStrategy.DeserializeMessage(MessageProperties properties, Byte[] body)
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass19.<Consume>b__18(Byte[] body, MessageProperties properties, MessageReceivedInfo messageReceivedInfo)
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1e.<Consume>b__1d(Byte[] body, MessageProperties properties, MessageReceivedInfo receviedInfo)
   at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandler(ConsumerExecutionContext context)<---

我需要做什么才能正确反序列化MessageA

4

3 回答 3

14

据我所知,EasyNetQ的默认设置要求序列化对象的类型在应用程序之间保持一致。例如,您可以像 String 这样轻松地发送任何已知的 .NET 类型:

 bus.Publish<String>("Excellent.");

并且对这两个项目都很满意。

如果你把它放到一个公共库(dll)中,你可以使用你自己的消息。由于您特别提到它们位于不同的项目中,因此我建议您自己序列化和转换对象。

EasyNetQ 内部使用Newtonsoft Json.NET来序列化这样的对象。如您所见,您的消息已被序列化为:

消息:{"Text":"Hello World"}

要自己执行此操作,您仍然需要添加对 Json.NET 的引用,因为EasyNetQ 使用 ilrepack 隐藏了此引用

这应该有效:

bus.Publish<string>(JsonConvert.SerializeObject(new MessageA { Text = "Hello World" }));

bus.Subscribe<string>("", HandleClusterNodes);

private static void HandleClusterNodes(string obj)
{
    var myMessage = (MessageB)JsonConvert.DeserializeObject<MessageB>(obj);
    Console.WriteLine(myMessage.Text);
}

但是您将失去基于属性的路由,并且可能想要修改您的方法。

如果您想继续使用基本方法,可以这样设置主题:

bus.Publish<string>(JsonConvert.SerializeObject(new MessageA { Text = "Hello World" }), "topic.name");

bus.Subscribe<string>("", HandleClusterNodes, new Action<EasyNetQ.FluentConfiguration.ISubscriptionConfiguration>( o => o.WithTopic("topic.name")));

但是要完全控制,您需要使用Advanced API

var yourMessage = new Message<string>(JsonConvert.SerializeObject(new MessageA { Text = "Hello World" }));
bus.Advanced.Publish<string>(new Exchange("YourExchangeName"), "your.routing.key", false, false, yourMessage);

在订户部分:

IQueue yourQueue = bus.Advanced.QueueDeclare("AnotherTestMessagesQueue");
IExchange yourExchange = bus.Advanced.ExchangeDeclare("YourExchangeName", ExchangeType.Topic);
bus.Advanced.Bind(yourExchange, yourQueue, "your.routing.key");
bus.Advanced.Consume<string>(yourQueue, (msg, info) => HandleClusterNodes(msg.Body));

这与原始的 RabbitMQ C# 客户端 API 几乎相同。


详细分析:

主要问题是这个异常:

EasyNetQ.EasyNetQException:找不到类型 EasyNetQSample.ProgramA+MessageA:EasyNetQSample

这是由 EasyNetQ 抛出的,因为它在端点上找不到特殊类。

如果我们查看TypeNameSerializer.cs的源代码,您会看到

var type = Type.GetType(nameParts[0] + ", " + nameParts[1]);
            if (type == null)
            {
                throw new EasyNetQException(
                    "Cannot find type {0}",
                    typeName);
            }

这是它试图在第二个项目中找到 EasyNetQSample.ProgramA.Message A类型的地方,而它只知道 EasyNetQSample.ProgramB.Message B

或者,您可以推出自己的自定义ISerializer或将 ITypeNameSerializer 放入默认序列化器中,但我没有尝试过。

于 2014-10-17T00:01:09.010 回答
3

Furkan 是正确的,您的问题是订阅者需要访问MessageA您的发布者定义的类型,以便它可以将消息反序列化为该类型。来自EasyNetQ 文档

当消息被序列化时,EasyNetQ 将消息类型名称存储在消息属性的 Type 属性中。此元数据与您的消息一起发送给任何可以使用它反序列化消息的订阅者。

这相当于发布者和消费者之间的紧密共享合同。如果你想放松它,那么你可以做几件事:

  • 您可以基于接口(例如IMessage,您可以从中派生MessageA)发布和订阅。您仍然需要访问该MessageA类型才能转换收到的消息,但您可以在不指定特定派生类型的情况下发布和订阅。

  • 您可以创建一个单一的、共享的“容器”类型(例如MessageContainer),然后将您的类型序列化/反序列化为容器类型的实例,如 XML、JSON 等。您的订阅者可以将数据从容器中提取出来并根据需要进行解析。您甚至可以在容器标头中包含架构或版本信息,以向订阅者提供有关如何解析数据的一些提示。重点是他们永远不必将其转换为已定义的类型,因此不需要访问一堆类型,只需访问类型,MessageContainer这样他们就可以获取序列化的内容。

于 2014-10-27T18:17:43.743 回答
2

MessageA必须在两个应用程序的范围内

bus.Publish<MessageA>("Excellent."); 
于 2015-03-04T20:20:24.350 回答