Avro Schema引发StructType
这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。
我正在使用Sparkdataframe,它可以从几个不同的模式版本之一加载数据:
// Version One {"namespace": "com.example.avro", "type": "record", "name": "MeObject", "fields": [ {"name": "A", "type": ["null", "int"], "default": null} ] } // Version Two {"namespace": "com.example.avro", "type": "record", "name": "MeObject", "fields": [ {"name": "A", "type": ["null", "int"], "default": null}, {"name": "B", "type": ["null", "int"], "default": null} ] }
我正在使用Spark Avro加载数据。
DataFrame df = context.read() .format("com.databricks.spark.avro") .load("path/to/avro/file");
可以是Version One文件或Version Two文件。 但是,我希望能够以相同的方式处理它,将未知值设置为“null”。 我之前的问题中的建议是设置模式,但是我不想重复自己在.avro
文件中编写模式以及作为StructType
和朋友。 如何将avro架构(文本文件或生成的MeObject.getClassSchema()
)转换为sparks StructType
?
Spark Avro有一个SchemaConverters
,但它都是私有的,并返回一些奇怪的内部对象。
免责声明 :这是一种肮脏的黑客行为。 这取决于几件事:
- Python提供了一个轻量级的Avro处理库 ,由于它的动态性,它不需要类型化的编写器
- 空Avro文件仍然是有效文档
- Spark模式可以与JSON进行转换
以下代码读取Avro架构文件,使用给定架构创建空Avro文件,使用spark-csv
读取它并将Spark架构输出为JSON文件。
import argparse import tempfile import avro.schema from avro.datafile import DataFileWriter from avro.io import DatumWriter from pyspark import SparkContext from pyspark.sql import SQLContext def parse_schema(schema): with open(schema) as fr: return avro.schema.parse(open(schema).read()) def write_dummy(schema): tmp = tempfile.mktemp(suffix='.avro') with open(tmp, "w") as fw: writer = DataFileWriter(fw, DatumWriter(), schema) writer.close() return tmp def write_spark_schema(path, schema): with open(path, 'w') as fw: fw.write(schema.json()) def main(): parser = argparse.ArgumentParser(description='Avro schema converter') parser.add_argument('--schema') parser.add_argument('--output') args = parser.parse_args() sc = SparkContext('local[1]', 'Avro schema converter') sqlContext = SQLContext(sc) df = (sqlContext.read.format('com.databricks.spark.avro') .load(write_dummy(parse_schema(args.schema)))) write_spark_schema(args.output, df.schema) sc.stop() if __name__ == '__main__': main()
用法:
bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \ avro_to_spark_schema.py \ --schema path_to_avro_schema.avsc \ --output path_to_spark_schema.json
读取架构:
import scala.io.Source import org.apache.spark.sql.types.{DataType, StructType} val json: String = Source.fromFile("schema.json").getLines.toList.head val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
请看看这是否有所帮助,尽管很晚。 我正在努力完成目前的工作。 我使用过Databricks的schemaconverter。 我想,你试图用给定的模式读取avro文件。
val schemaObj = new Schema.Parser().parse(new File(avscfilepath)); var sparkSchema : StructType = new StructType import scala.collection.JavaConversions._ for(field <- schemaObj.getFields()){ sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType) } sparkSchema
- 如何在使用JAR运行spark-submit时将程序参数传递给main函数?
- 使用sc.textFile以递归方式从子目录中获取文件内容
- 为什么启动StreamingContext失败并出现“IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行”?
- 不断增加YARN中Spark应用程序的物理内存
- 是否可以在Apache Spark中创建嵌套的RDD?
- 无法找到Web UI的资源路径:org / apache / spark / ui / static创建Spark应用程序时
- 如何下载dse.jar
- 与csv文件相比,将mysql表转换为spark数据集的速度非常慢
- 在Apache Spark中,我可以轻松地重复/嵌套SparkContext.parallelize吗?