0

问题如下。

脚步:

  1. 应用程序将一些自定义对象转换为 avro 片段(字节数组);
  2. 这个 avro 片段被发送到 EventData 对象中的事件中心;
  3. 事件中心触发一个从事件中心接收 Mcrosoft.ServiceBus.Messaging.EventData 的 azure 函数;
  4. 我可以提取 EventData 的主体,它包含点 1 的 avro 片段(字节数组)。

我正在使用 Microsoft.Hadoop.Avro。

我有原始自定义对象的架构(第 1 点),所以我尝试创建一个从 avro 片段读取的通用读取器,但我收到以下错误:

流中的 Avro 对象容器无效。无法识别标头。

似乎 Microsoft.Hadoop.Avro 只能管理完整的 avro 文件(标题 + 架构 + 正文)而不是 avro 片段(正文)。

使用 java avro-tool 我可以将模式添加到 avro 片段。.Net 或 .Net Core 也可以吗?我能怎么做?

为简单起见,我将来自事件中心的 EventData 替换为相关的 avro 文件。

using (Stream stream = new FileStream(@"...\trip-real-0-2019-03-14-12-14.avro", FileMode.Open, FileAccess.Read, FileShare.Read))
{
    // create a generic reader for the event hub avro message
    using (var reader = AvroContainer.CreateGenericReader(stream))
    {
        while (reader.MoveNext())
        {
            foreach (dynamic record in reader.Current.Objects)
            {
                //get the body of the event hub message (fragment avro bytes)
                var avroFragmentByeArray = (byte[])(record.Body);

                // try to create a generic reader with the schema.
                // this line throws an exception
                using (var r = AvroContainer.CreateGenericReader(schema, new MemoryStream(avroFragmentByeArray), true, new CodecFactory()))                                    
                {

                }
            }
        }
    }
}
4

1 回答 1

0

我找到了怎么做。有两种方法:

  1. 使用 C# 中的 avro-tool.jar;
  2. 使用 Apache Avro 库(推荐)。

1°解决方案首先获取事件数据消息中的字节并保存在本地。

public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
    {
        try
        {                
            string fileName = "avro-bytes.avro";
            List<string> filesToProcess = new List<string>();
            string singleFileNameToSave = fileName;
            filesToProcess.Add(singleFileNameToSave);              
            string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");  
            System.IO.Directory.CreateDirectory(path);              
            File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());                
            return filesToProcess;
        }
        catch (Exception ex)
        {
            throw;
        }
    }

比从 azure 函数调用 avro-tool.jar 并将输出重定向到变量中

 Process myProcess = new Process();
 myProcess.StartInfo.UseShellExecute = false;
 myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";                   
 // execute avro tools         
 string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
 // here you must use the file with the bytes saved before and the avroschema file
 myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
 myProcess.StartInfo.RedirectStandardOutput = true;
 myProcess.Start();
 // print the output to a string 
 string output = myProcess.StandardOutput.ReadToEnd();
 myProcess.WaitForExit();

Avro-tool 可能会使用与您需要的模式不同的模式来反序列化字节,因此您需要将 avro-tool 模型映射到您的模型上。随着模型复杂性的变化,这一步会消耗很多资源。

AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);

2°解决方案

这是推荐的解决方案。可以用几行代码执行反序列化。

string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
   memStream.Seek(0, SeekOrigin.Begin);
   Schema writerSchema = Schema.Parse(schema);
   Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
   output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}

模型类必须实现 ISpecificRecord 接口,如下所示:

[DataContract]
public class MyModel: ISpecificRecord
{
    [DataMember]
    public string Id;
    [DataMember]
    public enumP Type;
    [DataMember]
    public long Timestamp;
    public Dictionary<string, string> Context;

    public static Schema _SCHEMA = Avro.Schema.Parse(@"...");

    public virtual Schema Schema
    {
        get
        {
            return Position._SCHEMA;
        }
    }

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return this.Id;
            case 1: return this.Timestamp;
            case 2: return this.Type;                
            case 3: return this.Context;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
        };
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: this.Id = (System.String)fieldValue; break;
            case 1: this.Timestamp = (System.Int64)fieldValue; break;
            case 2: this.Type = (enumP)fieldValue; break;                
            case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
        };
    }
}

[DataContract]
public enum enumP
{
    ONE, TWO, THREE
}

MyModel 类中的属性名称在使用的模式中必须相同。

于 2019-03-23T13:11:29.287 回答