Spark SQL:镶嵌错误的嵌套类
我似乎无法写一个JavaRDD
,其中T是一个说法, Person
类。 我把它定义为
public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private String age; private Address address; ....
Address
:
public class Address implements Serializable { private static final long serialVersionUID = 1L; private String City; private String Block; ...
然后我像这样创建一个JavaRDD
:
JavaRDD people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function() { public Person call(String line) { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge("2"); Address address = new Address("HomeAdd","141H"); person.setAddress(address); return person; } });
注意 – 我手动设置Address
相同。 这基本上是一个嵌套的RDD。 试图将其保存为镶木地板文件:
DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class); dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet");
地址类是:
import java.io.Serializable; public class Address implements Serializable { public Address(String city, String block) { super(); City = city; Block = block; } private static final long serialVersionUID = 1L; private String City; private String Block; //Omitting getters and setters }
我遇到错误:
引起:java.lang.ClassCastException: com.test.schema.Address无法强制转换为org.apache.spark.sql.Row
我正在运行spark-1.4.1。
- 这是一个已知的错误?
- 如果我通过导入相同格式的嵌套JSON文件来执行相同操作,我可以保存到镶木地板。
- 即使我创建了一个子DataFrame,如:
DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable");
我仍然得到同样的错误
什么赋予了什么? 如何从文本文件中读取复杂的数据结构并另存为镶木地板? 似乎我不能这样做。
您正在使用具有限制的java api
来自spark文档: http : //spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds
Spark SQL支持自动将JavaBeans的RDD转换为DataFrame。 使用reflection获得的BeanInfo定义了表的模式。 目前,Spark SQL不支持包含嵌套或包含复杂类型(如Lists或Arrays)的JavaBean。 您可以通过创建实现Serializable的类来创建JavaBean,并为其所有字段设置getter和setter。 使用scala案例类它将起作用(更新为写入镶木地板格式)
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD case class Address(city:String, block:String); case class Person(name:String,age:String, address:Address); object Test2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Simple Application").setMaster("local"); val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc); import sqlContext.implicits._ val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d")))); val df = sqlContext.createDataFrame(people); df.write.mode("overwrite").parquet("/tmp/people.parquet") } }