Spark – Java UDF返回多列

我正在使用sparkSql 1.6.2(Java API),我必须处理以下DataFrame,其中包含2列中的值列表:

ID AttributeName AttributeValue 0 [an1,an2,an3] [av1,av2,av3] 1 [bn1,bn2] [bv1,bv2] 

所需的表是:

 ID AttributeName AttributeValue 0 an1 av1 0 an2 av2 0 an3 av3 1 bn1 bv1 1 bn2 bv2 

我想我必须结合使用explode函数和自定义UDF函数。

我找到了以下资源:

  • 在Spark SQL表中爆炸(转置?)多个列
  • 如何使用JAVA在Spark DataFrame上调用UDF?

我可以成功运行一个读取两列的示例,并返回列中前两个字符串的串联

  UDF2 combineUDF = new UDF2<Seq, Seq, String>() { public String call(final Seq col1, final Seq col2) throws Exception { return col1.apply(0) + col2.apply(0); } }; context.udf().register("combineUDF", combineUDF, DataTypes.StringType); 

问题是写UDF的签名返回两列(在Java中)。 据我所知,我必须定义一个新的StructType,如下所示,并将其设置为返回类型,但到目前为止,我没有设法使最终的代码工作

 StructType retSchema = new StructType(new StructField[]{ new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()), new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()), } ); 

context.udf()。register(“combineUDF”,combineUDF,retSchema);

任何帮助将非常感激。

更新:我正在尝试首先实现zip(AttributeName,AttributeValue),所以我只需要在sparkSql中应用标准的爆炸函数:

 ID AttName_AttValue 0 [[an1,av1],[an1,av2],[an3,av3]] 1 [[bn1,bv1],[bn2,bv2]] 

我构建了以下UDF:

 UDF2 combineColumns = new UDF2<Seq, Seq, List<List>>() { public List<List> call(final Seq col1, final Seq col2) throws Exception { List<List> zipped = new LinkedList(); for (int i = 0, listSize = col1.size(); i < listSize; i++) { List subRow = Arrays.asList(col1.apply(i), col2.apply(i)); zipped.add(subRow); } return zipped; } }; 

但是当我运行代码时

 myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10); 

我收到以下错误消息:

scala.MatchError:[[an1,av1],[an1,av2],[an3,av3]](类java.util.LinkedList)

看起来组合已经正确执行但是返回类型不是Scala中的预期类型。

任何帮助?

最后,我设法得到了我正在寻找的结果,但可能没有以最有效的方式。

基本上是2步骤:

  • 邮编的两个清单
  • 爆炸列中的行

第一步,我定义了以下UDF函数

 UDF2 concatItems = new UDF2, Seq, Seq>() { public Seq call(final Seq col1, final Seq col2) throws Exception { ArrayList zipped = new ArrayList(); for (int i = 0, listSize = col1.size(); i < listSize; i++) { String subRow = col1.apply(i) + ";" + col2.apply(i); zipped.add(subRow); } return scala.collection.JavaConversions.asScalaBuffer(zipped); } }; 

缺少SparkSession的function注册:

 sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType); 

然后我用以下代码调用它:

 DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue")); 

在这个阶段,df2看起来像这样:

 ID AttName_AttValue 0 [[an1,av1],[an1,av2],[an3,av3]] 1 [[bn1,bv1],[bn2,bv2]] 

然后我调用以下lambda函数将列表爆炸成行:

  DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row")); 

在这个阶段,df3看起来像这样:

 ID AttName_AttValue 0 [an1,av1] 0 [an1,av2] 0 [an3,av3] 1 [bn1,bv1] 1 [bn2,bv2] 

最后,为了将属性名称和值拆分为两个不同的列,我将DataFrame转换为JavaRDD以便使用map函数:

 JavaRDD df3RDD = df3.toJavaRDD().map( (Function) myRow -> { String[] info = String.valueOf(myRow.get(1)).split(","); return RowFactory.create(myRow.get(0), info[0], info[1]); }).cache(); 

如果有人有更好的解决方案,请随时发表评论。 我希望它有所帮助。