Spark – Java UDF返回多列
我正在使用sparkSql 1.6.2(Java API),我必须处理以下DataFrame,其中包含2列中的值列表:
ID AttributeName AttributeValue 0 [an1,an2,an3] [av1,av2,av3] 1 [bn1,bn2] [bv1,bv2]
所需的表是:
ID AttributeName AttributeValue 0 an1 av1 0 an2 av2 0 an3 av3 1 bn1 bv1 1 bn2 bv2
我想我必须结合使用explode函数和自定义UDF函数。
我找到了以下资源:
- 在Spark SQL表中爆炸(转置?)多个列
- 如何使用JAVA在Spark DataFrame上调用UDF?
我可以成功运行一个读取两列的示例,并返回列中前两个字符串的串联
UDF2 combineUDF = new UDF2<Seq, Seq, String>() { public String call(final Seq col1, final Seq col2) throws Exception { return col1.apply(0) + col2.apply(0); } }; context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
问题是写UDF的签名返回两列(在Java中)。 据我所知,我必须定义一个新的StructType,如下所示,并将其设置为返回类型,但到目前为止,我没有设法使最终的代码工作
StructType retSchema = new StructType(new StructField[]{ new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()), new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()), } );
context.udf()。register(“combineUDF”,combineUDF,retSchema);
任何帮助将非常感激。
更新:我正在尝试首先实现zip(AttributeName,AttributeValue),所以我只需要在sparkSql中应用标准的爆炸函数:
ID AttName_AttValue 0 [[an1,av1],[an1,av2],[an3,av3]] 1 [[bn1,bv1],[bn2,bv2]]
我构建了以下UDF:
UDF2 combineColumns = new UDF2<Seq, Seq, List<List>>() { public List<List> call(final Seq col1, final Seq col2) throws Exception { List<List> zipped = new LinkedList(); for (int i = 0, listSize = col1.size(); i < listSize; i++) { List subRow = Arrays.asList(col1.apply(i), col2.apply(i)); zipped.add(subRow); } return zipped; } };
但是当我运行代码时
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
我收到以下错误消息:
scala.MatchError:[[an1,av1],[an1,av2],[an3,av3]](类java.util.LinkedList)
看起来组合已经正确执行但是返回类型不是Scala中的预期类型。
任何帮助?
最后,我设法得到了我正在寻找的结果,但可能没有以最有效的方式。
基本上是2步骤:
- 邮编的两个清单
- 爆炸列中的行
第一步,我定义了以下UDF函数
UDF2 concatItems = new UDF2, Seq, Seq >() { public Seq call(final Seq col1, final Seq col2) throws Exception { ArrayList zipped = new ArrayList(); for (int i = 0, listSize = col1.size(); i < listSize; i++) { String subRow = col1.apply(i) + ";" + col2.apply(i); zipped.add(subRow); } return scala.collection.JavaConversions.asScalaBuffer(zipped); } };
缺少SparkSession的function注册:
sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);
然后我用以下代码调用它:
DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));
在这个阶段,df2看起来像这样:
ID AttName_AttValue 0 [[an1,av1],[an1,av2],[an3,av3]] 1 [[bn1,bv1],[bn2,bv2]]
然后我调用以下lambda函数将列表爆炸成行:
DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));
在这个阶段,df3看起来像这样:
ID AttName_AttValue 0 [an1,av1] 0 [an1,av2] 0 [an3,av3] 1 [bn1,bv1] 1 [bn2,bv2]
最后,为了将属性名称和值拆分为两个不同的列,我将DataFrame转换为JavaRDD以便使用map函数:
JavaRDD df3RDD = df3.toJavaRDD().map( (Function) myRow -> { String[] info = String.valueOf(myRow.get(1)).split(","); return RowFactory.create(myRow.get(0), info[0], info[1]); }).cache();
如果有人有更好的解决方案,请随时发表评论。 我希望它有所帮助。