5

我有一个简单的 Azure WebJobs ServiceBusTrigger 看起来像

public static async void ProcessQueueMessage([ServiceBusTrigger("myqueuename")] String json, TextWriter log) { ... }

不幸的是,它无法将 JSON 反序列化为 XML(不足为奇)。我检查了有效载荷并确认它只是一个 UTF-8 编码的字节数组。我有两个问题。

  1. 为什么假设我的字符串是 XML?
  2. 我如何告诉它不,没有 XML,只有一个字符串?

堆栈跟踪:

System.InvalidOperationException: Exception binding parameter 'json' ---> System.Runtime.Serialization.SerializationException: There was an error deserializing the object of type System.String. The input source is not correctly formatted. ---> System.Xml.XmlException: The input source is not correctly formatted.
 at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
 at System.Xml.XmlBufferReader.ReadValue(XmlBinaryNodeType nodeType, ValueHandle value)
 at System.Xml.XmlBinaryReader.ReadNode()
 at System.Xml.XmlBinaryReader.Read()
 at System.Xml.XmlBaseReader.IsStartElement()
 at System.Xml.XmlBaseReader.IsStartElement(XmlDictionaryString localName, XmlDictionaryString namespaceUri)
 at System.Runtime.Serialization.XmlReaderDelegator.IsStartElement(XmlDictionaryString localname, XmlDictionaryString ns)
 at System.Runtime.Serialization.XmlObjectSerializer.IsRootElement(XmlReaderDelegator reader, DataContract contract, XmlDictionaryString name, XmlDictionaryString ns)
 at System.Runtime.Serialization.DataContractSerializer.InternalIsStartObject(XmlReaderDelegator reader)
 at System.Runtime.Serialization.DataContractSerializer.InternalReadObject(XmlReaderDelegator xmlReader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 --- End of inner exception stack trace ---
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.DataContractSerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)
 at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlReader reader, Boolean verifyObjectName)
 at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName)
 at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
 at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlDictionaryReader reader)
 at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(Stream stream)
 at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T](XmlObjectSerializer serializer)
 at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T]()
 at Microsoft.Azure.WebJobs.ServiceBus.Triggers.BrokeredMessageToStringConverter.ConvertAsync(BrokeredMessage input, CancellationToken cancellationToken)
 at Microsoft.Azure.WebJobs.ServiceBus.Triggers.ConverterArgumentBindingProvider`1.ConverterArgumentBinding.<BindAsync>d__0.MoveNext()

编辑:WebJobs 文档建议不仅我所做的工作(String)而且 ServiceBusTrigger 应该自动反序列化 JSON 对象。但是,如果我尝试取出我的 POCO,我仍然会收到 XML 反序列化错误。有趣的是,如果我将类型设置为 Byte[],我也会收到 XML 反序列化错误,这也应该可以工作。

编辑2:Stream也不起作用。似乎只有BrokeredMessage 对触发器有效,而 GetBody 是我能找到的从 BrokeredMessage 中获取字符串的唯一方法。

4

4 回答 4

3

通过添加自定义消息处理器,我能够进行反序列化工作。

自定义消息处理器示例可在https://github.com/Azure/azure-webjobs-sdk-samples/tree/master/BasicSamples/MiscOperations

您将 ContentType 设置为 application/json 如下 -

public class CustomMessagingProvider : MessagingProvider
{
    private readonly ServiceBusConfiguration _config;

    public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
    {
        _config = config;
    }

    public override MessageProcessor CreateMessageProcessor(string entityPath)
    {
        return new CustomMessageProcessor(_config.MessageOptions);
    }

    private class CustomMessageProcessor : MessageProcessor
    {
        public CustomMessageProcessor(OnMessageOptions messageOptions)
            : base(messageOptions)
        {
        }

        public override Task<bool> BeginProcessingMessageAsync(BrokeredMessage message, CancellationToken cancellationToken)
        {
            message.ContentType = "application/json";

            return base.BeginProcessingMessageAsync(message, cancellationToken);
        }
    }
}

然后在 webjob 中配置 ServiceBusConfiguration 时设置此消息处理器。

 JobHostConfiguration config = new JobHostConfiguration();
        ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration
        {
            ConnectionString = _servicesBusConnectionString
        };

        serviceBusConfig.MessagingProvider = new CustomMessagingProvider(serviceBusConfig);

        config.UseServiceBus(serviceBusConfig);
于 2015-09-24T10:41:02.033 回答
2

BrokeredMessage作为对#2的回答,一个hacky的解决方法是从ServiceBusTriggerand call获取 a message.ToBody<Stream>()。然后你可以使用正常的东西将字节流转换为字符串。

将 JSON 编码的有效负载反序列化为对象的示例:


public static async void ProcessQueueMessage([ServiceBusTrigger("my-queue")] BrokeredMessage message)
{
    var stream = message.ToBody();
    using (var streamReader = new StreamReader(stream, Encoding.UTF8)
    {
        var json = await streamReader.ReadToEndAsync();
        var deserialized = JsonConvert.DeserializeObject(json);
    }
}

注意:如果有人有更好的答案,我仍然对更好的答案感兴趣。在一个完美的世界中,我将能够为 ServiceBusTrigger 提供一个自定义反序列化器(基于 JSON.net 或只是一个字符串反序列化器),但如果可能的话,我不知道该怎么做。

于 2015-01-05T13:58:42.443 回答
2

如果您的有效负载是字符串,您有两个选择:

  1. 把参数类型BrokeredMessage改成,然后自己反序列化
  2. BrokeredMessage.ContentType属性设置为text/plain(假设您可以控制生成消息的代码)

除了String需要内容类型的奇怪情况外,规则是服务总线有效负载只能反序列化为与有效负载相同的对象。这是因为 ServiceBus 二进制序列化程序。

于 2015-01-06T07:46:27.353 回答
2

我在研究这个问题时发现了一些巫术。

我们在二进制序列化方面做得很好,但想看看当我们通过 webjobs 仪表板观察 webjobs 处理事件时消息的主体是什么。我从 github 上的 WebJobs ServiceBus 代码中窃取了代码来解决这个问题。对于为什么必须将 poco 序列化为 JSON 并将其打包到内存流中,然后将内容类型设置为 application/json,我仍然有点困惑。我的结论是 Azure 服务总线 SDK 内部一定有一些东西需要它。我还没有找到该代码,但在测试中效果很好。

首先,我们遇到了这种情况,在 webjobs 仪表板中被序列化的对象只是字节。

eventDto 显示为字节

使用下面的代码将消息放在总线上后,我们最终得到:

eventDto 有 json 值

我们不必更改消费端的签名,现在我们可以看到 BrokeredMessage 的正文。 public void PersistEvents([ServiceBusTrigger(EventProcessor.CoolTopicName, LoggingSubscription)] EventDto eventDto, TextWriter logger)

这是 修复它的Azure ServiceBus SDK的代码。
这是我的示例代码(从 SDK 借来的): string json = JsonConvert.SerializeObject(eventDto); // Using Json.net here MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(json), writable: false); BrokeredMessage message = new BrokeredMessage(stream); message.ContentType = "application/json"; client.Send(message);

于 2016-06-13T20:51:29.157 回答