带解码器问题的Kafka Avro Consumer

当我尝试使用我的相应模式使用Avro运行Kafka Consumer时 ,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。 我看到其他人有类似的问题,将字节数组转换为json , Avro写入和读取 ,以及Kafka Avro Binary *编码器 。 我也引用了这个消费者组示例 ,它们都很有帮助,但到目前为止这个错误没有任何帮助..它可以工作到这部分代码(第73行)

解码器解码器= DecoderFactory.get()。binaryDecoder(byteArrayInputStream,null);

我已经尝试了其他解码器并打印出byteArrayInputStream变量的内容,看起来我相信你会期望序列化的avro数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回594.我无法理解为什么会发生此错误。 Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流。 我将不胜感激任何帮助。

也许问题是Nifi如何编写(编码)Avro数据与消费者应用程序读取(解码)数据的方式不匹配。

简而言之,Avro的API提供了两种不同的序列化方法:

  1. 用于创建正确的Avro 文件 :对数据记录进行编码,还要将Avro架构嵌入到一种前导码中(通过org.apache.avro.file.{DataFileWriter/DataFileReader} )。 将模式嵌入到Avro文件中非常有意义,因为(a)Avro文件的“有效负载”通常比嵌入式Avro模式大一些,并且(b)然后您可以根据自己的内容复制或移动这些文件并且仍然确保你可以再次阅读它们,而无需咨询某人或某事。
  2. 仅编码数据记录,即不嵌入模式(通过org.apache.avro.io.{BinaryEncoder/BinaryDecoder} ;注意包名称的差异: io here vs file above)。 例如,当对正在写入Kafka主题的消息进行Avro编码时,这种方法通常很受欢迎,因为与上面的变体1相比,您不会产生将Avro架构重新嵌入到每个消息中的开销,假设您的(非常合理)策略是,对于相同的Kafka主题,消息使用相同的Avro架构进行格式化/编码。 这是一个显着的优点,因为在流数据上下文中,动态数据记录通常比上面描述的静态数据Avro文件小得多(通常在100字节到几百KB之间)(通常是数百或者数千MB); 因此,Avro架构的大小相对较大,因此在向Kafka写入2000个数据记录时,您不希望将其嵌入2000x。 缺点是您必须“以某种方式”跟踪Avro架构如何映射到Kafka主题 – 或者更准确地说,您必须以某种方式跟踪编码消息的Avro架构,而不必直接嵌入架构的路径。 好消息是Kafka生态系统(Avro架构注册表)中有工具可用于透明地执行此操作。 因此,与变体1相比,变体2以便利性为代价获得了效率。

结果是,编码的Avro数据的“有线格式”看起来会有所不同,具体取决于您使用上面的(1)还是(2)。

我对Apache Nifi不太熟悉,但是快速查看源代码(例如ConvertAvroToJSON.java )向我建议它使用变量1,即它将Avro架构与Avro记录一起嵌入。 但是,您的使用者代码使用DecoderFactory.get().binaryDecoder() ,因此使用变体2(没有嵌入模式)。

也许这解释了你遇到的错误?