如何在Java / Kotlin中创建一个返回复杂类型的Spark UDF?
我正在尝试编写一个返回复杂类型的UDF:
private val toPrice = UDF1<String, Map> { s -> val elements = s.split(" ") mapOf("value" to elements[0], "currency" to elements[1]) } val type = DataTypes.createStructType(listOf( DataTypes.createStructField("value", DataTypes.StringType, false), DataTypes.createStructField("currency", DataTypes.StringType, false))) df.sqlContext().udf().register("toPrice", toPrice, type)
但是每次我用这个:
df = df.withColumn("price", callUDF("toPrice", col("price")))
我得到一个神秘的错误:
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379) ... 19 more
我尝试使用自定义数据类型:
class Price(val value: Double, val currency: String) : Serializable
使用返回该类型的UDF:
private val toPrice = UDF1 { s -> val elements = s.split(" ") Price(elements[0].toDouble(), elements[1]) }
但后来我得到另一个MatchError
,它抱怨Price
类型。
如何正确编写可以返回复杂类型的UDF?
TL; DR该函数应返回类org.apache.spark.sql.Row
的对象。
Spark提供了UDF
定义的两个主要变体。
-
使用Scalareflection的
udf
变体:-
def udf[RT](f: () ⇒ RT)(implicit arg0: TypeTag[RT]): UserDefinedFunction
-
def udf[RT, A1](f: (A1) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunction
- …
-
def udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ⇒ RT)(implicit arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])
哪个定义
Scala将…参数关闭为用户定义的函数(UDF)。 根据Scala闭包的签名自动推断数据类型。
这些变体在没有primefaces或代数数据类型的模式的情况下使用。 例如,有问题的函数将在Scala中定义:
case class Price(value: Double, currency: String) val df = Seq("1 USD").toDF("price") val toPrice = udf((s: String) => scala.util.Try { s split(" ") match { case Array(price, currency) => Price(price.toDouble, currency) } }.toOption) df.select(toPrice($"price")).show // +----------+ // |UDF(price)| // +----------+ // |[1.0, USD]| // +----------+
在此变体中,返回类型是自动编码的。
由于它依赖于reflection,因此该变体主要用于Scala用户。
-
-
提供模式定义的
udf
变体(您在此处使用的变体)。 此变体的返回类型应与Dataset[Row]
的返回类型相同:-
正如在另一个答案中指出的那样,您只能使用SQL类型映射表中列出的类型 (primefaces类型为boxed或unboxed,
java.sql.Timestamp
/java.sql.Date
,以及高级集合)。 -
复杂结构(
structs
/structs
StructTypes
)使用org.apache.spark.sql.Row
表示。 不允许与代数数据类型或等效数据混合。 例如(Scala代码)struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>
应该表达为
Row(1, Row("foo", Row(-1.0, 42))))
不
(1, ("foo", (-1.0, 42))))
或任何混合变体,如
Row(1, Row("foo", (-1.0, 42))))
提供此变体主要是为了确保Java互操作性。
在这种情况下(相当于有问题的那个),定义应类似于以下定义:
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.Row val schema = StructType(Seq( StructField("value", DoubleType, false), StructField("currency", StringType, false) )) val toPrice = udf((s: String) => scala.util.Try { s split(" ") match { case Array(price, currency) => Row(price.toDouble, currency) } }.getOrElse(null), schema) df.select(toPrice($"price")).show // +----------+ // |UDF(price)| // +----------+ // |[1.0, USD]| // | null| // +----------+
排除exception处理的所有细微差别(通常
UDFs
应该控制null
输入,按照惯例优雅地处理格式错误的数据)Java等效应该看起来或多或少像这样:UserDefinedFunction price = udf((String s) -> { String[] split = s.split(" "); return RowFactory.create(Double.parseDouble(split[0]), split[1]); }, DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("value", DataTypes.DoubleType, true), DataTypes.createStructField("currency", DataTypes.StringType, true) }));
-
背景 :
为了给你一些上下文,这个区别也反映在API的其他部分。 例如,您可以从架构和一系列Rows
创建DataFrame
:
def createDataFrame(rows: List[Row], schema: StructType): DataFrame
或使用一系列Products
reflection
def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame
但不支持混合变体。
换句话说,您应该提供可以使用RowEncoder
编码的输入。
当然你通常不会使用udf
来完成像这样的任务:
import org.apache.spark.sql.functions._ df.withColumn("price", struct( split($"price", " ")(0).cast("double").alias("price"), split($"price", " ")(1).alias("currency") ))
相关 :
- 在SQLContext之外的Java中创建SparkSQL UDF
很简单。 转到数据类型参考并找到相应的类型。
在Spark 2.3中
- 如果将返回类型声明为
StructType
则函数必须返回org.apache.spark.sql.Row
。 - 如果返回
Map
函数返回类型应该是MapType
– 显然不是你想要的。