1

在 Microsoft Azure 中,我们有一个事件中心,用于捕获 JSON 数据并将其以 AVRO 格式存储在 blob 存储帐户中:

存储帐户屏幕截图

我编写了一个 python 脚本,它将从事件中心获取 AVRO 文件:

import os, avro
from io import BytesIO
from operator import itemgetter, attrgetter
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

conn_str = 'DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net'
container_name = 'container1'

blob_service_client = BlobServiceClient.from_connection_string(conn_str)
container_client = blob_service_client.get_container_client(container_name)

blob_list = []
for blob in container_client.list_blobs():
    if blob.name.endswith('.avro'):
        blob_list.append(blob)

blob_list.sort(key=attrgetter('creation_time'), reverse=True)

这很好用,我得到了一个按创建时间排序的 AVRO blob 列表。

现在我正在尝试添加下载 blob、解析 AVRO 格式的数据并检索 JSON 有效负载的最后步骤。

我尝试将列表中的每个 blob 检索到内存缓冲区并对其进行解析:

for blob in blob_list:
    blob_client = container_client.get_blob_client(blob.name)
    downloader = blob_client.download_blob()
    stream = BytesIO()
    downloader.download_to_stream(stream) # also tried readinto(stream)

    reader = DataFileReader(stream, DatumReader())
    for event_data in reader:
        print(event_data)
    reader.close()

不幸的是,上面的 Python 代码不起作用,没有打印任何内容。

我也看到了,有一种StorageStreamDownloader.readall()方法,但我不确定如何应用它。

我正在使用 pip 安装的 Windows 10、python 3.8.5 和 avro 1.10.0。

4

1 回答 1

1

使用readall()方法时,应按如下方式使用:

       with open("xxx", "wb+") as my_file: 
           my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.

有关读取捕获的 eventthub 数据的更多详细信息,您可以参考此官方文档:创建 Python 脚本以读取您的捕获文件

如果您还有更多问题,请告诉我:)。

于 2020-10-02T05:49:35.027 回答