如何使用JAVA在Spark DataFrame上调用UDF?

和这里一样的问题,但没有足够的意见在那里发表评论。 根据最新的Spark 文档 ,udf()可以以两种不同的方式使用,一种使用SQL,另一种使用DataFrame。 我发现了多个如何使用udf()和sql的示例,但是还没有找到关于如何直接在DataFrame上使用udf()的任何内容。 op在上面链接的问题上提供的解决方案使用了不推荐使用的callUDF() ,并且将根据Spark Java API文档在Spark 2.0中删除。 在那里,它说“因为它与udf()是多余的”,所以这意味着我应该能够使用udf()来调用我的udf(),但是无法弄清楚如何做到这一点。 我发现使用来自Java的Spark是非常不值得的..无休止的谷歌搜索每一步只是为了弄清楚如何进行明显的操作……每一步所忍受的痛苦都不会减轻下一步所需的痛苦!! 我没有偶然发现一些拼写Java-Spark程序语法的东西。 我错过了什么?

import org.apache.spark.sql.api.java.UDF1; . . UDF1 mode = new UDF1() { public String call(final String[] types) throws Exception { return types[0]; } }; sqlContext.udf().register("mode", mode, DataTypes.StringType); df.???????? how do I call my udf (mode) on a given column of my DataFrame df? 

Spark> = 2.3

可以直接调用Scala样式的udf

 import static org.apache.spark.sql.functions.*; import org.apache.spark.sql.expressions.UserDefinedFunction; UserDefinedFunction mode = udf( (Seq ss) -> ss.headOption(), DataTypes.StringType ); df.select(mode.apply(col("vs"))).show(); 

Spark <2.3

即使我们假设您的UDF很有用并且不能被简单的getItem调用替换它也有不正确的签名。 使用Scala WrappedArray而不是普通的Java Arrays公开数组列,因此您必须调整签名:

 UDF1 mode = new UDF1, String>() { public String call(final Seq types) throws Exception { return types.headOption(); } }; 

如果UDF已经注册:

 sqlContext.udf().register("mode", mode, DataTypes.StringType); 

你可以简单地使用callUDF (1.5中引入的新函数)来按名称调用它:

 df.select(callUDF("mode", col("vs"))).show(); 

您也可以在selectExprs使用它:

 df.selectExpr("mode(vs)").show();