如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?

我正在尝试编写一个返回复杂类型的UDF:

private val toPrice = UDF1<String, Map> { s -> val elements = s.split(" ") mapOf("value" to elements[0], "currency" to elements[1]) } val type = DataTypes.createStructType(listOf( DataTypes.createStructField("value", DataTypes.StringType, false), DataTypes.createStructField("currency", DataTypes.StringType, false))) df.sqlContext().udf().register("toPrice", toPrice, type) 

但是每次我用这个:

 df = df.withColumn("price", callUDF("toPrice", col("price"))) 

我得到一个神秘的错误:

 Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379) ... 19 more 

我尝试使用自定义数据类型:

 class Price(val value: Double, val currency: String) : Serializable 

使用返回该类型的UDF:

 private val toPrice = UDF1 { s -> val elements = s.split(" ") Price(elements[0].toDouble(), elements[1]) } 

但后来我得到另一个MatchError ,它抱怨Price类型。

如何正确编写可以返回复杂类型的UDF?

TL; DR该函数应返回类org.apache.spark.sql.Row的对象。

Spark提供了UDF定义的两个主要变体。

  1. 使用Scalareflection的udf变体:

    • def udf[RT](f: () ⇒ RT)(implicit arg0: TypeTag[RT]): UserDefinedFunction
    • def udf[RT, A1](f: (A1) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunction
    • def udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])

    哪个定义

    Scala将…参数关闭为用户定义的函数(UDF)。 根据Scala闭包的签名自动推断数据类型。

    这些变体在没有primefaces或代数数据类型的模式的情况下使用。 例如,有问题的函数将在Scala中定义:

     case class Price(value: Double, currency: String) val df = Seq("1 USD").toDF("price") val toPrice = udf((s: String) => scala.util.Try { s split(" ") match { case Array(price, currency) => Price(price.toDouble, currency) } }.toOption) df.select(toPrice($"price")).show // +----------+ // |UDF(price)| // +----------+ // |[1.0, USD]| // +----------+ 

    在此变体中,返回类型是自动编码的。

    由于它依赖于reflection,因此该变体主要用于Scala用户。

  2. 提供模式定义的udf变体(您在此处使用的变体)。 此变体的返回类型应与Dataset[Row]的返回类型相同:

    • 正如在另一个答案中指出的那样,您只能使用SQL类型映射表中列出的类型 (primefaces类型为boxed或unboxed, java.sql.Timestamp / java.sql.Date ,以及高级集合)。

    • 复杂结构( structs / structs StructTypes )使用org.apache.spark.sql.Row表示。 不允许与代数数据类型或等效数据混合。 例如(Scala代码)

       struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>> 

      应该表达为

       Row(1, Row("foo", Row(-1.0, 42)))) 

       (1, ("foo", (-1.0, 42)))) 

      或任何混合变体,如

       Row(1, Row("foo", (-1.0, 42)))) 

    提供此变体主要是为了确保Java互操作性。

    在这种情况下(相当于有问题的那个),定义应类似于以下定义:

     import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.Row val schema = StructType(Seq( StructField("value", DoubleType, false), StructField("currency", StringType, false) )) val toPrice = udf((s: String) => scala.util.Try { s split(" ") match { case Array(price, currency) => Row(price.toDouble, currency) } }.getOrElse(null), schema) df.select(toPrice($"price")).show // +----------+ // |UDF(price)| // +----------+ // |[1.0, USD]| // | null| // +----------+ 

    排除exception处理的所有细微差别(通常UDFs应该控制null输入,按照惯例优雅地处理格式错误的数据)Java等效应该看起来或多或少像这样:

     UserDefinedFunction price = udf((String s) -> { String[] split = s.split(" "); return RowFactory.create(Double.parseDouble(split[0]), split[1]); }, DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("value", DataTypes.DoubleType, true), DataTypes.createStructField("currency", DataTypes.StringType, true) })); 

背景

为了给你一些上下文,这个区别也反映在API的其他部分。 例如,您可以从架构和一系列Rows创建DataFrame

 def createDataFrame(rows: List[Row], schema: StructType): DataFrame 

或使用一系列Productsreflection

 def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame 

但不支持混合变体。

换句话说,您应该提供可以使用RowEncoder编码的输入。

当然你通常不会使用udf来完成像这样的任务:

 import org.apache.spark.sql.functions._ df.withColumn("price", struct( split($"price", " ")(0).cast("double").alias("price"), split($"price", " ")(1).alias("currency") )) 

相关

  • 在SQLContext之外的Java中创建SparkSQL UDF

很简单。 转到数据类型参考并找到相应的类型。

在Spark 2.3中

  • 如果将返回类型声明为StructType则函数必须返回org.apache.spark.sql.Row
  • 如果返回Map函数返回类型应该是MapType – 显然不是你想要的。