我找到了怎么做。有两种方法:
- 使用 C# 中的 avro-tool.jar;
- 使用 Apache Avro 库(推荐)。
1°解决方案首先获取事件数据消息中的字节并保存在本地。
public List<string> SaveAvroBytesOnFile(EventData eventHubMessage, string functionAppDirectory)
{
try
{
string fileName = "avro-bytes.avro";
List<string> filesToProcess = new List<string>();
string singleFileNameToSave = fileName;
filesToProcess.Add(singleFileNameToSave);
string path = Path.Combine(functionAppDirectory,"AvroBytesFiles");
System.IO.Directory.CreateDirectory(path);
File.WriteAllBytes($"{path}{singleFileNameToSave}", eventHubMessage.GetBytes());
return filesToProcess;
}
catch (Exception ex)
{
throw;
}
}
比从 azure 函数调用 avro-tool.jar 并将输出重定向到变量中
Process myProcess = new Process();
myProcess.StartInfo.UseShellExecute = false;
myProcess.StartInfo.FileName = @"D:\Program Files\Java\jdk1.8.0_73\bin\java.exe";
// execute avro tools
string avroResourcesPath = Path.Combine(functionAppDirectory, "AvroResources");
// here you must use the file with the bytes saved before and the avroschema file
myProcess.StartInfo.Arguments = $"-jar {Path.Combine(avroResourcesPath, "avro-tools-1.8.2.jar")} fragtojson --schema-file {Path.Combine(avroResourcesPath, "schemafile.avsc")} {Path.Combine(functionAppDirectory, "AvroBytesFiles", byteFileNames[i])}";
myProcess.StartInfo.RedirectStandardOutput = true;
myProcess.Start();
// print the output to a string
string output = myProcess.StandardOutput.ReadToEnd();
myProcess.WaitForExit();
Avro-tool 可能会使用与您需要的模式不同的模式来反序列化字节,因此您需要将 avro-tool 模型映射到您的模型上。随着模型复杂性的变化,这一步会消耗很多资源。
AvroToolModel avroToolModel= JsonConvert.DeserializeObject<AvroTool>(output);
// map the avro model in my model
MyMode myModel = new MyModel(avroToolModel);
2°解决方案
这是推荐的解决方案。可以用几行代码执行反序列化。
string schema = @"...";
using (MemoryStream memStream = new MemoryStream(eventHubMessage.GetBytes()))
{
memStream.Seek(0, SeekOrigin.Begin);
Schema writerSchema = Schema.Parse(schema);
Avro.Specific.SpecificDatumReader<MyModel> r = new Avro.Specific.SpecificDatumReader<MyModel>(writerSchema, writerSchema);
output = r.Read(null, new Avro.IO.BinaryDecoder(memStream));
}
模型类必须实现 ISpecificRecord 接口,如下所示:
[DataContract]
public class MyModel: ISpecificRecord
{
[DataMember]
public string Id;
[DataMember]
public enumP Type;
[DataMember]
public long Timestamp;
public Dictionary<string, string> Context;
public static Schema _SCHEMA = Avro.Schema.Parse(@"...");
public virtual Schema Schema
{
get
{
return Position._SCHEMA;
}
}
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return this.Id;
case 1: return this.Timestamp;
case 2: return this.Type;
case 3: return this.Context;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: this.Id = (System.String)fieldValue; break;
case 1: this.Timestamp = (System.Int64)fieldValue; break;
case 2: this.Type = (enumP)fieldValue; break;
case 3: this.Context = (Dictionary<string,string>)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
};
}
}
[DataContract]
public enum enumP
{
ONE, TWO, THREE
}
MyModel 类中的属性名称在使用的模式中必须相同。