如何使用JAVA在Spark DataFrame上调用UDF?
和这里一样的问题,但没有足够的意见在那里发表评论。 根据最新的Spark 文档 ,udf()可以以两种不同的方式使用,一种使用SQL,另一种使用DataFrame。 我发现了多个如何使用udf()和sql的示例,但是还没有找到关于如何直接在DataFrame上使用udf()的任何内容。 op在上面链接的问题上提供的解决方案使用了不推荐使用的callUDF() ,并且将根据Spark Java API文档在Spark 2.0中删除。 在那里,它说“因为它与udf()是多余的”,所以这意味着我应该能够使用udf()来调用我的udf(),但是无法弄清楚如何做到这一点。 我发现使用来自Java的Spark是非常不值得的..无休止的谷歌搜索每一步只是为了弄清楚如何进行明显的操作……每一步所忍受的痛苦都不会减轻下一步所需的痛苦!! 我没有偶然发现一些拼写Java-Spark程序语法的东西。 我错过了什么?
import org.apache.spark.sql.api.java.UDF1; . . UDF1 mode = new UDF1() { public String call(final String[] types) throws Exception { return types[0]; } }; sqlContext.udf().register("mode", mode, DataTypes.StringType); df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
Spark> = 2.3
可以直接调用Scala样式的udf
:
import static org.apache.spark.sql.functions.*; import org.apache.spark.sql.expressions.UserDefinedFunction; UserDefinedFunction mode = udf( (Seq ss) -> ss.headOption(), DataTypes.StringType ); df.select(mode.apply(col("vs"))).show();
Spark <2.3
即使我们假设您的UDF很有用并且不能被简单的getItem
调用替换它也有不正确的签名。 使用Scala WrappedArray
而不是普通的Java Arrays公开数组列,因此您必须调整签名:
UDF1 mode = new UDF1, String>() { public String call(final Seq types) throws Exception { return types.headOption(); } };
如果UDF已经注册:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
你可以简单地使用callUDF
(1.5中引入的新函数)来按名称调用它:
df.select(callUDF("mode", col("vs"))).show();
您也可以在selectExprs
使用它:
df.selectExpr("mode(vs)").show();
- 如何使用spark处理一系列hbase行?
- TaskSchedulerImpl:初始作业未接受任何资源;
- RDD不可序列化的Cassandra / Spark连接器java API
- Spark提交失败,包含java.lang.NoSuchMethodError:scala.Predef $。$ conforms()Lscala / Predef $$ less $ colon $ less;
- 序列化RDD
- 使用Java从另一个应用程序部署Apache Spark应用程序,这是最佳实践
- Spark SQL失败,因为“常量池已超过JVM限制0xFFFF”
- 此语言级别不支持Lambda表达式
- 如何读取嵌套的JSON以进行聚合?