Tag: avro

Apache Avro框架可以在序列化期间处理参数化类型吗?

Apache Avro可以在序列化期间处理参数化类型吗? 当我尝试序列化使用generics的实例时,我看到从Avro框架抛出此exception – org.apache.avro.AvroTypeException: Unknown type: T at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:255) at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:514) at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:593) at org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:75) at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:472) at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189) 我尝试序列化的类看起来像这样 public class Property { private T propertyValue; } 我正在尝试根据传入的POJO实例动态生成架构。 我的序列化代码如下所示 – ByteArrayOutputStream os = new ByteArrayOutputStream(); ReflectData reflectData = ReflectData.AllowNull.get(); Schema schema = reflectData.getSchema(propertyValue.getClass()); DatumWriter writer = new ReflectDatumWriter(schema); Encoder encoder = EncoderFactory.get().jsonEncoder(schema, os); […]

Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer

每当我尝试从kafka队列中读取消息时,我都会遇到以下exception: [error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79) at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) 卡夫卡制片人代码: public class AvroSpecificProducer { private static Properties kafkaProps = new Properties(); private static KafkaProducer kafkaProducer; static { kafkaProps.put(“bootstrap.servers”, “localhost:9092”); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); kafkaProps.put(“schema.registry.url”, […]

KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord

我的KafkaProducer能够使用KafkaAvroSerializer将对象序列化为我的主题。 但是, KafkaConsumer.poll()返回反序列化的GenericRecord而不是我的序列化类。 MyKafkaProducer KafkaProducer producer; try (InputStream props = Resources.getResource(“producer.props”).openStream()) { Properties properties = new Properties(); properties.load(props); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); properties.put(“schema.registry.url”, “http://localhost:8081”); MyBean bean = new MyBean(); producer = new KafkaProducer(properties); producer.send(new ProducerRecord(topic, bean.getId(), bean)); 我的KafkaConsumer try (InputStream props = Resources.getResource(“consumer.props”).openStream()) { properties.load(props); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class); properties.put(“schema.registry.url”, “http://localhost:8081”); consumer = new KafkaConsumer(properties); […]

从HDFS读取一个简单的Avro文件

我试图简单读取存储在HDFS中的Avro文件。 我发现当它在本地文件系统上时如何读取它…. FileReader reader = DataFileReader.openReader(new File(filename), new GenericDatumReader()); for (GenericRecord datum : fileReader) { String value = datum.get(1).toString(); System.out.println(“value = ” value); } reader.close(); 但是我的文件是HDFS。 我不能给openReader一个Path或一个FSDataInputStream。 我怎样才能简单地读取HDFS中的Avro文件? 编辑:我通过创建实现SeekableInput的自定义类(SeekableHadoopInput)来实现此目的。 我从github上的“Ganglion”中“偷”了这个。 似乎仍然会有一个Hadoop / Avro集成路径。 谢谢

从Avro GenericRecord获取输入值

给定GenericRecord ,与Object相反,检索类型值的推荐方法是什么? 我们是否期望投射这些值,如果是这样,从Avro类型到Java类型的映射是什么? 例如,Avro Array == Java Collection ; 和Avro String == Java Utf8 。 由于每个GenericRecord都包含其架构,我希望以类型安全的方式检索值。

Avro:当我不知道“writer”使用的确切模式时,如何使用默认字段

在Java Avro中,如何将下面的data1 , data2和data3解析为GenericRecord 。 //Schema { “type”: “record”, “name”: “user”, “fields”: [ {“name”: “name”, “type”: “string”}, {“name”: “colour”, “type”: “string”, “default”: “green”}, {“name”: “mass”, “type”: “int”, “default”: 100} ] } //data 1 {“name”:”Sean”} //data 2 {“name”:”Sean”, “colour”:”red”} //data 3 {“name”:”Sean”, “colour”:”red”, “mass”:200} 我已经看过一些关于模式演化等的讨论,并且能够将作者的模式和读者的模式传递给GenericDatumReader和ResolvingDecoder,但我只有一个模式。 一般来说,我不知道作者使用的确切模式(如果有的话)。 我可以通过解析模式并使用默认值删除所有字段来“推断”“基础”模式。 但是,如果有多个字段具有默认值,则某些字段可能存在/可能不存在,因此我将无法推断出符合数据的模式。 例如 如果我使用给定的模式尝试GenericDatumReader来读取数据3,那么解析就成功了。 如果我使用推断的模式尝试GenericDatumReader来读取data1,那么解析就成功了。 如果我使用ResolvingDecoder使用推断架构和给定架构来尝试GenericDatumReader来读取数据1,那么解析就成功了。 所有其他选项都无法将data1和data3解析为GenericRecord,其中包含JSON字符串的所有值以及缺少字段的相应默认值。 并且似乎根本无法正确解析data2! 有人有什么建议吗? import […]

Avro架构不支持向后兼容性

我有这个avro架构 { “namespace”: “xx.xxxx.xxxxx.xxxxx”, “type”: “record”, “name”: “MyPayLoad”, “fields”: [ {“name”: “filed1”, “type”: “string”}, {“name”: “filed2”, “type”: “long”}, {“name”: “filed3”, “type”: “boolean”}, { “name” : “metrics”, “type”: { “type” : “array”, “items”: { “name”: “MyRecord”, “type”: “record”, “fields” : [ {“name”: “min”, “type”: “long”}, {“name”: “max”, “type”: “long”}, {“name”: “sum”, “type”: “long”}, {“name”: “count”, “type”: […]

如何在Java或Scala中读取和写入来自/到镶木地板文件的Map ?

寻找一个关于如何在Java或Scala中读取和写入来自/到镶木地板文件的Map的简明示例? 这是期望的结构,使用com.fasterxml.jackson.databind.ObjectMapper作为Java中的序列化程序(即使用镶木地板查找等效项): public static Map read(InputStream inputStream) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(inputStream, new TypeReference<Map>() { }); } public static void write(OutputStream outputStream, Map map) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.writeValue(outputStream, map); }

使用apache avro反映

Avro序列化在Hadoop用户中很受欢迎,但很难找到示例。 有谁能帮我这个示例代码? 我最感兴趣的是使用Reflect API来读/写文件并使用Union和Null注释。 public class Reflect { public class Packet { int cost; @Nullable TimeStamp stamp; public Packet(int cost, TimeStamp stamp){ this.cost = cost; this.stamp = stamp; } } public class TimeStamp { int hour = 0; int second = 0; public TimeStamp(int hour, int second){ this.hour = hour; this.second = second; } } […]

Avro字段默认值

我遇到了一些设置Avro字段默认值的问题。 我有一个简单的架构,如下所示: data.avsc: { “namespace”:”test”, “type”:”record”, “name”:”Data”, “fields”:[ { “name”: “id”, “type”: [ “long”, “null” ] }, { “name”: “value”, “type”: [ “string”, “null” ] }, { “name”: “raw”, “type”: [ “bytes”, “null” ] } ] } 我使用avro-maven-plugin v1.7.6来生成Java模型。 当我使用以下方法创建模型的实例时: Data data = Data.newBuilder().build(); ,它失败,但有例外: org.apache.avro.AvroRuntimeException:org.apache.avro.AvroRuntimeException:字段ID类型:UNION pos:0未设置且没有默认值。 但是,如果我指定“默认”属性, { “name”: “id”, “type”: [ “long”, “null” […]