Spark SQL:镶嵌错误的嵌套类

我似乎无法写一个JavaRDD ,其中T是一个说法, Person类。 我把它定义为

 public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private String age; private Address address; .... 

Address

 public class Address implements Serializable { private static final long serialVersionUID = 1L; private String City; private String Block; ... 

然后我像这样创建一个JavaRDD

 JavaRDD people = sc.textFile("/user/johndoe/spark/data/people.txt").map(new Function() { public Person call(String line) { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge("2"); Address address = new Address("HomeAdd","141H"); person.setAddress(address); return person; } }); 

注意 – 我手动设置Address相同。 这基本上是一个嵌套的RDD。 试图将其保存为镶木地板文件:

 DataFrame dfschemaPeople = sqlContext.createDataFrame(people, Person.class); dfschemaPeople.write().parquet("/user/johndoe/spark/data/out/people.parquet"); 

地址类是:

 import java.io.Serializable; public class Address implements Serializable { public Address(String city, String block) { super(); City = city; Block = block; } private static final long serialVersionUID = 1L; private String City; private String Block; //Omitting getters and setters } 

我遇到错误:

引起:java.lang.ClassCastException: com.test.schema.Address无法强制转换为org.apache.spark.sql.Row

我正在运行spark-1.4.1。

  • 这是一个已知的错误?
  • 如果我通过导入相同格式的嵌套JSON文件来执行相同操作,我可以保存到镶木地板。
  • 即使我创建了一个子DataFrame,如: DataFrame dfSubset = sqlContext.sql("SELECT address.city FROM PersonTable"); 我仍然得到同样的错误

什么赋予了什么? 如何从文本文件中读取复杂的数据结构并另存为镶木地板? 似乎我不能这样做。

您正在使用具有限制的java api

来自spark文档: http : //spark.apache.org/docs/1.4.1/sql-programming-guide.html#interoperating-with-rdds

Spark SQL支持自动将JavaBeans的RDD转换为DataFrame。 使用reflection获得的BeanInfo定义了表的模式。 目前,Spark SQL不支持包含嵌套或包含复杂类型(如Lists或Arrays)的JavaBean。 您可以通过创建实现Serializable的类来创建JavaBean,并为其所有字段设置getter和setter。 使用scala案例类它将起作用(更新为写入镶木地板格式)

 import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD case class Address(city:String, block:String); case class Person(name:String,age:String, address:Address); object Test2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Simple Application").setMaster("local"); val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc); import sqlContext.implicits._ val people = sc.parallelize(List(Person("a", "b", Address("a", "b")), Person("c", "d", Address("c", "d")))); val df = sqlContext.createDataFrame(people); df.write.mode("overwrite").parquet("/tmp/people.parquet") } }