Spark sql如何在不丢失空值的情况下爆炸

我有一个Dataframe,我试图压扁。 作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行。 例如,

id | name | likes _______________________________ 1 | Luke | [baseball, soccer] 

应该成为

 id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 

这是我的代码

 private DataFrame explodeDataFrame(DataFrame df) { DataFrame resultDf = df; for (StructField field : df.schema().fields()) { if (field.dataType() instanceof ArrayType) { resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); resultDf.show(); } } return resultDf; } 

问题是在我的数据中,一些数组列有空值。 在这种情况下,整个行都将被删除。 所以这个dataframe:

 id | name | likes _______________________________ 1 | Luke | [baseball, soccer] 2 | Lucy | null 

 id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 

代替

 id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 2 | Lucy | null 

如何爆炸我的数组,以便我不会丢失空行?

我使用的是Spark 1.5.2和Java 8

Spark 2.2+

你可以使用explode_outer函数:

 import org.apache.spark.sql.functions.explode_outer df.withColumn("likes", explode_outer($"likes")).show // +---+----+--------+ // | id|name| likes| // +---+----+--------+ // | 1|Luke|baseball| // | 1|Luke| soccer| // | 2|Lucy| null| // +---+----+--------+ 

Spark <= 2.1

在Scala中,Java等效应该几乎相同(导入单个函数使用import static )。

 import org.apache.spark.sql.functions.{array, col, explode, lit, when} val df = Seq( (1, "Luke", Some(Array("baseball", "soccer"))), (2, "Lucy", None) ).toDF("id", "name", "likes") df.withColumn("likes", explode( when(col("likes").isNotNull, col("likes")) // If null explode an array with a single null .otherwise(array(lit(null).cast("string"))))) 

这里的想法基本上是用所需类型的array(NULL)替换NULL 。 对于复杂类型(aka structs ),您必须提供完整的模式:

 val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") val st = StructType(Seq( StructField("_1", IntegerType, false), StructField("_2", StringType, true) )) dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast(st))))) 

要么

 dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast("struct<_1:int,_2:string>"))))) 

注意

如果已经将containsNull设置为false创建了数组Column ,则应首先更改它(使用Spark 2.1测试):

 df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true))) 

按照接受的答案,当数组元素是复杂类型时,可能难以手动定义(例如,使用大型结构)。

为了自动执行,我编写了以下帮助方法:

  def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = { val arrayFields = df.schema.fields .map(field => field.name -> field.dataType) .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])} .toMap columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) => dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol)) .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType))))) }