将JavaRDD转换为DataFrame时出现Spark错误:java.util.Arrays $ ArrayList不是数组模式的有效外部类型

我使用的是Spark 2.1.0。 对于以下代码,它读取文本文件并将内容转换为DataFrame,然后输入Word2Vector模型:

SparkSession spark = SparkSession.builder().appName("word2vector").getOrCreate(); JavaRDD lines = spark.sparkContext().textFile("input.txt", 10).toJavaRDD(); JavaRDD<List> lists = lines.map(new Function<String, List>(){ public List call(String line){ List list = Arrays.asList(line.split(" ")); return list; } }); JavaRDD rows = lists.map(new Function<List, Row>() { public Row call(List list) { return RowFactory.create(list); } }); StructType schema = new StructType(new StructField[] { new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty()) }); Dataset input = spark.createDataFrame(rows, schema); input.show(3); Word2Vec word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(100).setMinCount(0); Word2VecModel model = word2Vec.fit(input); Dataset result = model.transform(input); 

它引发了一个例外

java.lang.RuntimeException:编码时出错:java.util.Arrays $ ArrayList不是数组模式的有效外部类型

它发生在input.show(3) ,因此createDataFrame()导致exception,因为Arrays.asList()返回一个Arrays $ ArrayList ,这里不支持。 但Spark官方文档具有以下代码:

 List data = Arrays.asList( RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))), RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))), RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" "))) ); StructType schema = new StructType(new StructField[]{ new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty()) }); Dataset documentDF = spark.createDataFrame(data, schema); 

哪个工作得很好。 如果不支持Arrays $ ArrayList ,那么这段代码是如何工作的呢? 区别在于我正在将JavaRDD转换为DataFrame,但官方文档正在将List转换为DataFrame。 我相信Spark Java API有一个重载方法createDataFrame(),它接受JavaRDD并根据提供的模式将其转换为DataFrame。 我很困惑为什么它不起作用。 有人可以帮忙吗?

几天前我遇到了同样的问题,解决这个问题的唯一方法是使用数组数组。 为什么? 以下是回复:

ArrayType是Scala Arrays的包装器,它与Java数组一一对应。 默认情况下,Java ArrayList未映射到Scala Array,这就是您获得exception的原因:

java.util.Arrays $ ArrayList不是数组模式的有效外部类型

因此,直接传递String []可能有效:

 RowFactory.create(line.split(" ")) 

但是由于create将一个Object列表作为输入,因此一行可能有一个列列表,String []会被解释为一个String列列表。 这就是为什么需要一个双数组String:

 RowFactory.create(new String[][] {line.split(" ")}) 

但是,仍然是从Spark文档中的Java List行构建DataFrame的谜团。 这是因为SparkSession.createDataFrame函数版本作为行的第一个参数java.util.List进行特殊类型检查和转换,以便将所有Java Iterable(如此ArrayList)转换为Scala数组。 但是,使用JavaRDD的SparkSession.createDataFrame将行内容直接映射到DataFrame。

总结一下,这是正确的版本:

  SparkSession spark = SparkSession.builder().master("local[*]").appName("Word2Vec").getOrCreate(); SparkContext sc = spark.sparkContext(); sc.setLogLevel("WARN"); JavaRDD lines = sc.textFile("input.txt", 10).toJavaRDD(); JavaRDD rows = lines.map(new Function(){ public Row call(String line){ return RowFactory.create(new String[][] {line.split(" ")}); } }); StructType schema = new StructType(new StructField[] { new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty()) }); Dataset input = spark.createDataFrame(rows, schema); input.show(3); 

希望这能解决你的问题。

正如错误所说的那样。 ArrayList不等同于Scala的数组。 您应该使用普通数组(即String[] )。