Avro Schema引发StructType

这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。

我正在使用Sparkdataframe,它可以从几个不同的模式版本之一加载数据:

// Version One {"namespace": "com.example.avro", "type": "record", "name": "MeObject", "fields": [ {"name": "A", "type": ["null", "int"], "default": null} ] } // Version Two {"namespace": "com.example.avro", "type": "record", "name": "MeObject", "fields": [ {"name": "A", "type": ["null", "int"], "default": null}, {"name": "B", "type": ["null", "int"], "default": null} ] } 

我正在使用Spark Avro加载数据。

 DataFrame df = context.read() .format("com.databricks.spark.avro") .load("path/to/avro/file"); 

可以是Version One文件或Version Two文件。 但是,我希望能够以相同的方式处理它,将未知值设置为“null”。 我之前的问题中的建议是设置模式,但是我不想重复自己在.avro文件中编写模式以及作为StructType和朋友。 如何将avro架构(文本文件或生成的MeObject.getClassSchema() )转换为sparks StructType

Spark Avro有一个SchemaConverters ,但它都是私有的,并返回一些奇怪的内部对象。

免责声明 :这是一种肮脏的黑客行为。 这取决于几件事:

  • Python提供了一个轻量级的Avro处理库 ,由于它的动态性,它不需要类型化的编写器
  • 空Avro文件仍然是有效文档
  • Spark模式可以与JSON进行转换

以下代码读取Avro架构文件,使用给定架构创建空Avro文件,使用spark-csv读取它并将Spark架构输出为JSON文件。

 import argparse import tempfile import avro.schema from avro.datafile import DataFileWriter from avro.io import DatumWriter from pyspark import SparkContext from pyspark.sql import SQLContext def parse_schema(schema): with open(schema) as fr: return avro.schema.parse(open(schema).read()) def write_dummy(schema): tmp = tempfile.mktemp(suffix='.avro') with open(tmp, "w") as fw: writer = DataFileWriter(fw, DatumWriter(), schema) writer.close() return tmp def write_spark_schema(path, schema): with open(path, 'w') as fw: fw.write(schema.json()) def main(): parser = argparse.ArgumentParser(description='Avro schema converter') parser.add_argument('--schema') parser.add_argument('--output') args = parser.parse_args() sc = SparkContext('local[1]', 'Avro schema converter') sqlContext = SQLContext(sc) df = (sqlContext.read.format('com.databricks.spark.avro') .load(write_dummy(parse_schema(args.schema)))) write_spark_schema(args.output, df.schema) sc.stop() if __name__ == '__main__': main() 

用法:

 bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \ avro_to_spark_schema.py \ --schema path_to_avro_schema.avsc \ --output path_to_spark_schema.json 

读取架构:

 import scala.io.Source import org.apache.spark.sql.types.{DataType, StructType} val json: String = Source.fromFile("schema.json").getLines.toList.head val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType] 

请看看这是否有所帮助,尽管很晚。 我正在努力完成目前的工作。 我使用过Databricks的schemaconverter。 我想,你试图用给定的模式读取avro文件。

  val schemaObj = new Schema.Parser().parse(new File(avscfilepath)); var sparkSchema : StructType = new StructType import scala.collection.JavaConversions._ for(field <- schemaObj.getFields()){ sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType) } sparkSchema