1

我已经设置了一个 Azure 事件中心,我正在从 Python 脚本发送 JSON 格式的 AMQP 消息,并尝试使用流分析将这些消息流式传输到 Power BI。消息是来自物联网设备的非常简单的设备活动

Python 片段是

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf8')
message.body = msg
messenger.put(message)
messenger.send()

我使用 MS 教程中的示例 C# 消息阅读器从事件中心读取数据没有问题,输出为:

Message received.  Partition: '2', Data: '??{"DeviceUID": "z_70b3d515200002e7_0", "Signal": "/on?1", "DeviceID": "1", "Hub": "91754623489", "Timestamp": "2016-07-15T07:56:50.277440Z"}'

但是当我尝试从事件中心测试流分析输入时,我收到了错误

诊断:无法将输入事件反序列化为 Json。一些可能的原因:1)格式错误的事件 2)输入源配置错误的序列化格式

我不确定 Malformed Events 是什么意思 - 我假设流分析可以处理通过 AMQP 发送到事件中心的数据?

我看不出 C# 应用程序收到的 JSON 有什么问题 - 除非 BOM 符号导致问题?

这是我第一次尝试这一切,我搜索了任何类似的帖子都无济于事,所以如果有人能指出我正确的方向,我将不胜感激。

干杯抢

4

3 回答 3

3

This is caused by client API incompatibility. Python uses Proton to send the JSON string in the body of an AMQP Value message. The body is encoded as an AMQP string (AMQP type encoding bytes + utf8 encoded bytes of string). Stream Analytics uses Service Bus .Net SDK which exposes AMQP message as EventData and its body is always byte array. For AMQP value message, it includes the AMQP type encoding bytes as without them it is not possible to decoded the following value. These extra bytes at the beginning will cause JSON serialization to fail.

To achieve interoperability on message body, the application should ensure the publisher and consumer agree on its type and encoding. In this case the publisher should send raw bytes in an AMQP Data message. With the Proton Python API, you can try this:

message.body = msg.encode('utf-8')

The other workaround is to send simple types (e.g. string) in application properties.

Other people also ran into this issue. https://github.com/Azure/amqpnetlite/issues/117

于 2016-07-17T17:56:42.767 回答
1

正如@XinChen 所说,这个问题是由 AMQP 协议引起的。

根据我的经验,以下两种解决方法对这种情况有效。

  1. 使用Send EventREST API 代替带有 AMQP 的 Azure Python SDK,但其余 api 基于 HTTP 协议,性能不高。
  2. 使用 Base64 编码发送 JSON 消息,然后将收到的消息解码为 JSON 字符串。
于 2016-07-25T06:55:56.960 回答
0

这两件事对我有用:

  • 添加message.inferred = True
  • 检查以确保您指定的转储编码encoding='utf-8'encoding='utf8'您的示例不同。

更新的操作:

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf-8')
message.body = msg
message.inferred = True
messenger.put(message)
messenger.send()

通过添加推断标志,我认为消息序列化程序可以正确推断正文是字节并创建 AMPQ DATA,从而解决@Xin Chen 的观点。

消息的推断标志指示消息内容如何编码到 AMQP 部分中。如果推断为真,则消息正文中的二进制和列表值将分别编码为 AMQP DATA 和 AMQP SEQUENCE 部分。如果 inferred 为 false,则消息正文中的所有值都将被编码为 AMQP VALUE 部分,无论其类型如何。

回复:Qpid Proton 文档#inferred

回复:JSON 编码器和解码器#dumps

于 2016-07-28T21:48:15.760 回答