0

我已经使用捕获功能将事件中心输出到 Blob 存储 - 这会将我们作为 .avro 文件放入队列中的任何内容输出。

如果我下载这个文件并尝试使用像 avro-js 这样的库来解析它,我没有问题 - 我可以很好地读取文件并按照我认为合适的方式处理内容。

但是 - 在使用 Node 处理 Azure Blob 存储时,我想将文件处理为已下载的文件。读取文件时返回的格式是缓冲区,但我找不到使用库成功解析此文件的方法(找不到正确的方法,如果有的话)。

用于从 Azure 下载 blob 的代码,省略了一些位:

const { BlobServiceClient } = require('@azure/storage-blob');
const blobServiceClient = BlobServiceClient.fromConnectionString(AZURE_STORAGE_CONNECTION_STRING);
const containerClient = blobServiceClient.getContainerClient("data");

const blockBlobClient = containerClient.getBlockBlobClient(blob.name);
    
const downloadBlockBlobResponse = await blockBlobClient.download(0);

输出到控制台时的缓冲区片段:

<Buffer 4f 62 6a 01 04 14 61 76 72 6f 2e 63 6f 64 65 63 08 6e 75 6c 6c 16 61 76 72 6f 2e 73 63 68 65 6d 61 ec 06 7b 22 74 79 70 65 22 3a 22 72 65 63 6f 72 64 ... 589 more bytes>

转换为字符串时的内容(由于乱码无法正确输出而粘贴图像):

在此处输入图像描述

已尝试将 .avro 文件作为纯文本读取,虽然它们大部分都可以,但有一些字符是乱码,因此它不会读取为 JSON(而且我不想对内容做出假设尝试将消息正文拉出)。

有没有人根据 Buffers 从 Azure 中成功提取 .avro 内容?我在网上看到了很多关于将这些加载到 Spark 或 Kafka 中的指导,但不仅仅是读取流中的文件。

谢谢!

4

1 回答 1

1

关于这个问题,我们可以使用该包avsc来解析带有缓冲区的 avro 文件。更多详情,请参阅此处

例如

const avro = require("avsc");
const {
  BlobServiceClient,
  StorageSharedKeyCredential,
} = require("@azure/storage-blob");

const accountName = "";
const accountKey =
  "";
async function main() {
  const creds = new StorageSharedKeyCredential(accountName, accountKey);
  const blobServiceClient = new BlobServiceClient(
    `https://${accountName}.blob.core.windows.net`,
    creds
  );
  const containerClient = blobServiceClient.getContainerClient("");
  const blockBlobClient = containerClient.getBlockBlobClient(
    ""
  );

  const downloadBlockBlobResponse = await blockBlobClient.download(0);
  const buf = await streamToBuffer(
    downloadBlockBlobResponse.readableStreamBody
  );

  const decoder = new avro.streams.BlockDecoder({
    parseHook: (schema) => {
      console.log("the avro file schema:");
      console.log(schema);
      return avro.Type.forSchema(schema, { wrapUnions: true });
    },
  });

  decoder.on("data", (data) => {
    console.log(data);
  });
  decoder.end(buf);
}
async function streamToBuffer(readableStream) {
  return new Promise((resolve, reject) => {
    const chunks = [];
    readableStream.on("data", (data) => {
      chunks.push(data instanceof Buffer ? data : Buffer.from(data));
    });
    readableStream.on("end", () => {
      resolve(Buffer.concat(chunks));
    });
    readableStream.on("error", reject);
  });
}
main();

在此处输入图像描述

于 2021-01-04T03:14:58.193 回答