1

我正在尝试向事件中心发送和接收 json 数据。但我收到错误消息:接收数据时,事件数据与 JSON 类型不兼容。这是我的发送和接收代码。

#Event_Hub_send.py
import sys
import logging
import datetime
import time
import os

from azure.eventhub import EventHubClient, Sender, EventData

logger = logging.getLogger("azure")

ADDRESS = "amqps://myns.servicebus.windows.net/myeventhub"
USER = "RootManageSharedAccessKey"
KEY = "mykey"

try:
    if not ADDRESS: 
        raise ValueError("No EventHubs URL supplied.")

    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    try:
    employee = [
        { "name": "sankar", "age": 28 }, 
        { "name": "Madhan", "age": 21 }, 
        { "name": "Vishwa", "age": 32 }
    ]

        start_time = time.time()
        for i in employee:
            print("Sending message: {}".format(i))
            sender.send(EventData(i))
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time
        logger.info("Runtime: {} seconds".format(run_time))

except KeyboardInterrupt:
    pass

发送时我没有收到任何错误。

#Event_Hub_Receive.py
import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
import json

logger = logging.getLogger("azure")

ADDRESS = "amqps://myns.servicebus.windows.net/myeventhub"
USER = "RootManageSharedAccessKey"
KEY = "mykey"

CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    for event_data in receiver.receive(timeout=100):
    data = event_data.body_as_json()
    print(data)

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

我试图以 json 格式接收数据。但抛出事件数据与 JSON 类型不兼容:无法解码 JSON 对象。

    raise TypeError("Event data is not compatible with JSON type: {}".format(e))
TypeError: Event data is not compatible with JSON type: No JSON object could be decoded

所以我尝试将方法更改为event_data.body_as_str(). 但我收到以下回复:

agename
agename
Received 0 messages in 0.263074874878 seconds

有人可以建议吗

4

1 回答 1

0

尝试这样做:

import json # add this 

for i in employee:
    print("Sending message: {}".format(i))
    sender.send(EventData(json.dumps(i)))

如果是这种情况,上面应该可以工作,这似乎i是一种类型。dict

于 2019-07-23T01:40:53.407 回答