0

我正在使用 azure 事件中心 python SDK 在此链接之后向事件中心发送和接收消息。https://github.com/Azure/azure-event-hubs-python/tree/develop。我可以成功发送和接收消息。但是我如何解析消息并从事件数据对象中检索数据。请在下面找到代码。

import os
import sys
#import logging
from azure.eventhub import EventHubClient, Receiver, Offset

ADDRESS = 'sb://####.servicebus.windows.net/#####'
USER = '##########'
KEY = '##################################'
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "1"


total = 0
last_sn = -1
last_offset = "-1"

try:
  if not ADDRESS:
      raise ValueError("No EventHubs URL supplied.")
  client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
  receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, 
  offset=OFFSET)
  client.run()
  try:
      batched_events = receiver.receive(timeout=20)
  except:
      raise
  finally:
      client.stop()
  for event_data in batched_events:
      last_offset = event_data.offset.value
      last_sn = event_data.sequence_number
      total += 1
      print("Partition {}, Received {}, sn={} offset={}".format(
         PARTITION,
         total,
         last_sn,
         last_offset))

except KeyboardInterrupt:
   pass

如果我尝试查看收到的 event_data,我可以看到以下消息。event_data <azure.eventhub.common.EventData at 0xd4f1358> event_data.message

<uamqp.message.Message at 0xd4f1240>

以上关于如何解析此消息以提取数据的任何帮助

4

2 回答 2

3

截至1.1.0,有新的实用方法来提取消息的实际数据:

那么,曾经是什么

import json
event_obj = json.loads(next(event_data.body).decode('UTF-8'))

就是现在:

event_obj = event_data.body_as_json()
于 2018-12-02T08:53:10.667 回答
1

对于使用事件中心版本 5.2.0 的人——今天最新(GitHub参考文档),它与 1.1.0 版本相同,即使用body_as_str()body_as_json()。但是客户端发生了变化——新版本中有一个EventHubProducerClient和一个EventHubConsumerClient。要打印收到的事件的正文:

from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubConsumerClient.from_connection_string(
        connection_str, consumer_group, eventhub_name=eventhub_name
    )

def on_event_batch(partition_context, events):
    partition_context.update_checkpoint()
    for e in events:
        print(e.body_as_str())

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
于 2020-11-11T01:25:18.413 回答