Spark sql如何在不丢失空值的情况下爆炸
我有一个Dataframe,我试图压扁。 作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行。 例如,
id | name | likes _______________________________ 1 | Luke | [baseball, soccer]
应该成为
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer
这是我的代码
private DataFrame explodeDataFrame(DataFrame df) { DataFrame resultDf = df; for (StructField field : df.schema().fields()) { if (field.dataType() instanceof ArrayType) { resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); resultDf.show(); } } return resultDf; }
问题是在我的数据中,一些数组列有空值。 在这种情况下,整个行都将被删除。 所以这个dataframe:
id | name | likes _______________________________ 1 | Luke | [baseball, soccer] 2 | Lucy | null
变
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer
代替
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 2 | Lucy | null
如何爆炸我的数组,以便我不会丢失空行?
我使用的是Spark 1.5.2和Java 8
Spark 2.2+
你可以使用explode_outer
函数:
import org.apache.spark.sql.functions.explode_outer df.withColumn("likes", explode_outer($"likes")).show // +---+----+--------+ // | id|name| likes| // +---+----+--------+ // | 1|Luke|baseball| // | 1|Luke| soccer| // | 2|Lucy| null| // +---+----+--------+
Spark <= 2.1
在Scala中,Java等效应该几乎相同(导入单个函数使用import static
)。
import org.apache.spark.sql.functions.{array, col, explode, lit, when} val df = Seq( (1, "Luke", Some(Array("baseball", "soccer"))), (2, "Lucy", None) ).toDF("id", "name", "likes") df.withColumn("likes", explode( when(col("likes").isNotNull, col("likes")) // If null explode an array with a single null .otherwise(array(lit(null).cast("string")))))
这里的想法基本上是用所需类型的array(NULL)
替换NULL
。 对于复杂类型(aka structs
),您必须提供完整的模式:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") val st = StructType(Seq( StructField("_1", IntegerType, false), StructField("_2", StringType, true) )) dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast(st)))))
要么
dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
注意 :
如果已经将containsNull
设置为false
创建了数组Column
,则应首先更改它(使用Spark 2.1测试):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
按照接受的答案,当数组元素是复杂类型时,可能难以手动定义(例如,使用大型结构)。
为了自动执行,我编写了以下帮助方法:
def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = { val arrayFields = df.schema.fields .map(field => field.name -> field.dataType) .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])} .toMap columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) => dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol)) .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType))))) }
- 使用Java将spark RDD保存到本地文件系统
- 无法读取工件描述符:IntelliJ
- apache spark MLLib:如何为字符串function构建标记点?
- Apache Spark中的矩阵乘法
- 如何在Java中的Apache Spark中将DataFrame转换为Dataset?
- java + spark:org.apache.spark.SparkException:作业已中止:任务不可序列化:java.io.NotSerializableException
- Spark 1.6-无法在hadoop二进制路径中找到winutils二进制文件
- Spark SQL失败,因为“常量池已超过JVM限制0xFFFF”
- 如何在GroupBy操作后从spark DataFrame列中收集字符串列表?