0

我正在尝试使用“Microsoft Avro Library”将我的 C# 类序列化为“Avro”并将其发送到事件中心。但是,当我尝试通过流分析读取数据时,它会在日志“无效 Avro 格式,删除无效记录”中出现此错误

更多详细信息.. 使用https://azure.microsoft.com/en-in/documentation/articles/hdinsight-dotnet-avro-serialization/中所示的反射方法 序列化为 avro 格式并将其发送到事件中心

//Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
        //for serializing only properties attributed with DataContract/DateMember
        var avroSerializer = AvroSerializer.Create<SensorData>();

        //Create a memory stream buffer
        using (var buffer = new MemoryStream())
        {
            //Create a data set by using sample class and struct
            var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };

            //Serialize the data to the specified stream
            avroSerializer.Serialize(buffer, expected);
            var bytes = buffer.ToArray();
            var data = new EventData(bytes) {PartitionKey = "deviceId"};
            // send to event hub client
            eventHubClient.Send(data);
        }

事件可以很好地发布到事件中心。我创建了一个可以使用这些事件并能够反序列化它们的工作角色。

但是,当我将此事件中心设置为流分析的输入并将事件序列化格式设置为“avro”时,它会出现以下错误。

消息:无效的 Avro 格式,删除无效记录。

消息:IncorrectSerializationFormat 错误发生得太快。他们被暂时压制

我想我也必须包括 Avro Schema。谁能指导我正确的方法将 C# 类序列化为“avro”以便流分析可以理解它?

谢谢你的时间。

4

1 回答 1

2

您将必须包含架构。下面是一个如何与 Schema 一起发送事件的示例。这使用了 AvroContainer。

        var eventHubClient = EventHubClient.CreateFromConnectionString("ReplaceConnectionString","ReplaceEventHubPath");
        int numberOfEvents = 10;
        using (var memoryStream = new MemoryStream())
        using (var avroWriter = AvroContainer.CreateWriter<SensorData>(memoryStream, Codec.Null))
        using (var sqWriter = new SequentialWriter<SensorData>(avroWriter, numberOfEvents))
        {
            Enumerable.Range(0, numberOfEvents)
                .Select(i => new SensorData() { Id = "DeviceId", Value = i })
                .ToList()
                .ForEach(data => sqWriter.Write(data));
            memoryStream.Seek(0, SeekOrigin.Begin);
            var eventData = new EventData(memoryStream.ToArray());
            eventHubClient.Send(eventData);
        }
于 2015-10-09T06:12:41.683 回答