11

当我尝试使用 Avro 在具有我各自架构的数据上运行 Kafka Consumer 时,它返回错误 "AvroRuntimeException: Malformed data. Length is negative: -40" 。我看到其他人在将字节数组转换为 jsonAvro write and read以及Kafka Avro Binary *coder时遇到了类似的问题。我还引用了这个Consumer Group Example,它们都很有帮助,但是到目前为止对这个错误没有帮助。它一直工作到这部分代码(第 73 行)

解码器解码器 = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

我已经尝试过其他解码器并打印出 byteArrayInputStream 变量的内容,该变量看起来像我认为您期望序列化的 avro 数据的样子(在消息中我可以看到架构和一些数据以及一些格式错误的数据)我打印出了使用 .available() 方法可用的字节数,该方法返回 594。我无法理解为什么会发生此错误。Apache Nifi 用于从 hdfs 生成具有相同模式的 Kafka 流。我将不胜感激任何帮助。

4

1 回答 1

22

也许问题是 Nifi 写入(编码)Avro 数据的方式与您的消费者应用程序读取(解码)数据的方式不匹配。

简而言之,Avro 的 API 提供了两种不同的序列化方法:

  1. 用于创建正确的 Avro文件:对数据记录进行编码,同时将 Avro 模式嵌入到一种序言中(通过org.apache.avro.file.{DataFileWriter/DataFileReader})。将架构嵌入 Avro 文件很有意义,因为 (a) 通常 Avro 文件的“有效负载”比嵌入的 Avro 架构大几个数量级,并且 (b) 然后您可以随意复制或移动这些文件并且仍然确保您可以再次阅读它们而无需咨询某人或某事。
  2. 只对数据记录进行编码,即不嵌入模式(通过org.apache.avro.io.{BinaryEncoder/BinaryDecoder};注意包名的区别:iohere vs.file多于)。例如,在对写入 Kafka 主题的 Avro 编码消息时,这种方法通常受到青睐,因为与上面的变体 1 相比,您不会产生将 Avro 模式重新嵌入每条消息的开销,假设您的(非常合理的)策略是,对于同一个 Kafka 主题,消息使用相同的 Avro 模式进行格式化/编码。这是一个显着的优势,因为在流数据上下文中,动态数据记录通常比上述静态数据 Avro 文件小得多(通常在 100 字节和几百 KB 之间)(通常为数百或数千 MB);因此 Avro 模式的大小相对较大,因此您不想在将 2000 条数据记录写入 Kafka 时将其嵌入 2000 倍。缺点是你必须“不知何故” 跟踪 Avro 模式如何映射到 Kafka 主题——或者更准确地说,您必须以某种方式跟踪消息是使用哪个 Avro 模式进行编码的,而无需沿着直接嵌入模式的路径进行。好消息是有Kafka 生态系统(Avro 模式注册表)中可用的工具可以透明地执行此操作。因此,与变体 1 相比,变体 2 以牺牲便利性为代价提高了效率。

效果是编码的 Avro 数据的“有线格式”看起来会有所不同,具体取决于您使用上面的 (1) 还是 (2)。

我对 Apache Nifi 不是很熟悉,但是快速浏览一下源代码(例如ConvertAvroToJSON.java)告诉我它正在使用变体 1,即它在 Avro 记录旁边嵌入了 Avro 模式。但是,您的使用者代码使用DecoderFactory.get().binaryDecoder()变体 2(未嵌入模式)。

也许这解释了您遇到的错误?

于 2016-03-16T10:57:20.750 回答