基于第二个Dataframe的DataFrame过滤

使用Spark SQL,我有两个dataframe,它们是从一个创建的,例如:

df = sqlContext.createDataFrame(...); df1 = df.filter("value = 'abc'"); //[path, value] df2 = df.filter("value = 'qwe'"); //[path, value] 

我想过滤df1,如果它的’path’的一部分是df2中的任何路径。 因此,如果df1具有路径’a / b / c / d / e’的行,我会发现在df2中是否是路径为’a / b / c’的行。 在SQL中应该是这样的

 SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2) 

其中udf是用户定义的函数,用于缩短df1的原始路径。 天真的解决方案是使用JOIN然后过滤结果,但它很慢,因为df1和df2每行都超过10mil。

我也试过以下代码,但首先我必须从df2创建广播变量

 static Broadcast bdf; bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext sqlContext.createDataFrame(df1.javaRDD().filter( new Function(){ @Override public Boolean call(Row row) throws Exception { String foo = shortenPath(row.getString(0)); return bdf.value().filter("path = '"+foo+"'").count()>0; } } ), myClass.class) 

我遇到的问题是,当评估返回时/当执行df2过滤时,Spark卡住了。

我想知道如何使用两个dataframe来完成此任务。 我真的想避免加入。 有任何想法吗?


编辑>>

在我的原始代码中,df1的别名是’first’,df2’cond’。 此连接不是笛卡儿,也不使用广播。

 df1 = df1.as("first"); df2 = df2.as("second"); df1.join(df2, df1.col("first.path"). lt(df2.col("second.path")) , "left_outer"). filter("isPrefix(first.path, second.path)"). na().drop("any"); 

isPrefix是udf

 UDF2 isPrefix = new UDF2() { @Override public Boolean call(String p, String s) throws Exception { //return true if (p.length()+4==s.length()) and s.contains(p) }}; 

shortenPath – 它在路径中剪切最后两个字符

 UDF1 shortenPath = new UDF1() { @Override public String call(String s) throws Exception { String[] foo = s.split("/"); String result = ""; for (int i = 0; i < foo.length-2; i++) { result += foo[i]; if(i<foo.length-3) result+="/"; } return result; } }; 

记录示例。 路径是独特的。

 a/a/a/b/c abc a/a/a qwe a/b/c/d/e abc a/b/c qwe a/b/b/k foo a/b/f/a bar ... 

所以df1的意思是

 a/a/a/b/c abc a/b/c/d/e abc ... 

和df2的构成

 a/a/a qwe a/b/c qwe ... 

您的代码至少有一些问题:

  • 您无法在其他操作或转换中执行操作或转换。 这意味着过滤广播的DataFrame根本无法工作,您应该得到一个例外。
  • join you use作为Cartesian产品执行,后跟filter。 由于Spark使用Hashing进行连接,因此只有基于相等的连接可以在没有Cartesian的情况下有效地执行。 它与为什么在SQL查询中使用UDF导致笛卡尔积相关?
  • 如果两个DataFrames都相对较大并且具有相似的大小,则广播不太可能有用。 请参阅为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢
  • 在性能方面并不重要,但isPrefix似乎错了。 特别是它看起来可以匹配前缀和后缀
  • col("first.path").lt(col("second.path"))条件看起来不对。 我假设您希望df1a/a/a a/a/a/b/cdf2中的a/a/a匹配。 如果是这样,它应该是gt不是。

可能你能做的最好的事情就是这样:

 import org.apache.spark.sql.functions.{col, regexp_extract} val df = sc.parallelize(Seq( ("a/a/a/b/c", "abc"), ("a/a/a","qwe"), ("a/b/c/d/e", "abc"), ("a/b/c", "qwe"), ("a/b/b/k", "foo"), ("a/b/f/a", "bar") )).toDF("path", "value") val df1 = df .where(col("value") === "abc") .withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1)) .as("df1") val df2 = df.where(col("value") === "qwe").as("df2") val joined = df1.join(df2, col("df1.path_short") === col("df2.path")) 

您可以尝试像这样广播其中一个表(Spark> = 1.5.0):

 import org.apache.spark.sql.functions.broadcast df1.join(broadcast(df2), col("df1.path_short") === col("df2.path")) 

并且增加了自动广播限制,但正如我上面提到的,它很可能不如普通的HashJoin效率低。

作为使用子查询实现IN一种可能方式,可以使用LEFT SEMI JOIN

  JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp"); SQLContext sqlContext = new SQLContext(javaSparkContext); StructType schema = DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("path", DataTypes.StringType, false), DataTypes.createStructField("value", DataTypes.StringType, false) }); // Prepare First DataFrame List dataForFirstDF = new ArrayList<>(); dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc")); dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc")); dataForFirstDF.add(RowFactory.create("x/y/z", "xyz")); DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema); // df1.show(); // // +---------+-----+ // | path|value| // +---------+-----+ // |a/a/a/b/c| abc| // |a/b/c/d/e| abc| // | x/y/z| xyz| // +---------+-----+ // Prepare Second DataFrame List dataForSecondDF = new ArrayList<>(); dataForSecondDF.add(RowFactory.create("a/a/a", "qwe")); dataForSecondDF.add(RowFactory.create("a/b/c", "qwe")); DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema); // Use left semi join to filter out df1 based on path in df2 Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path")); DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi"); // result.show(); // // +---------+-----+ // | path|value| // +---------+-----+ // |a/a/a/b/c| abc| // |a/b/c/d/e| abc| // +---------+-----+ 

此类查询的物理计划如下所示:

 == Physical Plan == Limit 21 ConvertToSafe LeftSemiJoinBNL Some(Contains(path#0, path#2)) ConvertToUnsafe Scan PhysicalRDD[path#0,value#1] TungstenProject [path#2] Scan PhysicalRDD[path#2,value#3] 

它将使用LeftSemiJoinBNL进行实际的连接操作,该操作应在内部广播值。 更多细节请参考Spark中的实际实现 – LeftSemiJoinBNL.scala

PS我不太明白需要删除最后两个字符,但是如果需要的话 – 可以这样做,就像@ zero323提议的那样(使用regexp_extract )。