基于第二个Dataframe的DataFrame过滤
使用Spark SQL,我有两个dataframe,它们是从一个创建的,例如:
df = sqlContext.createDataFrame(...); df1 = df.filter("value = 'abc'"); //[path, value] df2 = df.filter("value = 'qwe'"); //[path, value]
我想过滤df1,如果它的’path’的一部分是df2中的任何路径。 因此,如果df1具有路径’a / b / c / d / e’的行,我会发现在df2中是否是路径为’a / b / c’的行。 在SQL中应该是这样的
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
其中udf是用户定义的函数,用于缩短df1的原始路径。 天真的解决方案是使用JOIN然后过滤结果,但它很慢,因为df1和df2每行都超过10mil。
我也试过以下代码,但首先我必须从df2创建广播变量
static Broadcast bdf; bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext sqlContext.createDataFrame(df1.javaRDD().filter( new Function(){ @Override public Boolean call(Row row) throws Exception { String foo = shortenPath(row.getString(0)); return bdf.value().filter("path = '"+foo+"'").count()>0; } } ), myClass.class)
我遇到的问题是,当评估返回时/当执行df2过滤时,Spark卡住了。
我想知道如何使用两个dataframe来完成此任务。 我真的想避免加入。 有任何想法吗?
编辑>>
在我的原始代码中,df1的别名是’first’,df2’cond’。 此连接不是笛卡儿,也不使用广播。
df1 = df1.as("first"); df2 = df2.as("second"); df1.join(df2, df1.col("first.path"). lt(df2.col("second.path")) , "left_outer"). filter("isPrefix(first.path, second.path)"). na().drop("any");
isPrefix是udf
UDF2 isPrefix = new UDF2() { @Override public Boolean call(String p, String s) throws Exception { //return true if (p.length()+4==s.length()) and s.contains(p) }};
shortenPath – 它在路径中剪切最后两个字符
UDF1 shortenPath = new UDF1() { @Override public String call(String s) throws Exception { String[] foo = s.split("/"); String result = ""; for (int i = 0; i < foo.length-2; i++) { result += foo[i]; if(i<foo.length-3) result+="/"; } return result; } };
记录示例。 路径是独特的。
a/a/a/b/c abc a/a/a qwe a/b/c/d/e abc a/b/c qwe a/b/b/k foo a/b/f/a bar ...
所以df1的意思是
a/a/a/b/c abc a/b/c/d/e abc ...
和df2的构成
a/a/a qwe a/b/c qwe ...
您的代码至少有一些问题:
- 您无法在其他操作或转换中执行操作或转换。 这意味着过滤广播的
DataFrame
根本无法工作,您应该得到一个例外。 -
join
you use作为Cartesian产品执行,后跟filter。 由于Spark使用Hashing
进行连接,因此只有基于相等的连接可以在没有Cartesian的情况下有效地执行。 它与为什么在SQL查询中使用UDF导致笛卡尔积相关? - 如果两个
DataFrames
都相对较大并且具有相似的大小,则广播不太可能有用。 请参阅为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢 - 在性能方面并不重要,但
isPrefix
似乎错了。 特别是它看起来可以匹配前缀和后缀 -
col("first.path").lt(col("second.path"))
条件看起来不对。 我假设您希望df1
中a/a/a
a/a/a/b/c
与df2
中的a/a/a
匹配。 如果是这样,它应该是gt
不是。
可能你能做的最好的事情就是这样:
import org.apache.spark.sql.functions.{col, regexp_extract} val df = sc.parallelize(Seq( ("a/a/a/b/c", "abc"), ("a/a/a","qwe"), ("a/b/c/d/e", "abc"), ("a/b/c", "qwe"), ("a/b/b/k", "foo"), ("a/b/f/a", "bar") )).toDF("path", "value") val df1 = df .where(col("value") === "abc") .withColumn("path_short", regexp_extract(col("path"), "^(.*)(/.){2}$", 1)) .as("df1") val df2 = df.where(col("value") === "qwe").as("df2") val joined = df1.join(df2, col("df1.path_short") === col("df2.path"))
您可以尝试像这样广播其中一个表(Spark> = 1.5.0):
import org.apache.spark.sql.functions.broadcast df1.join(broadcast(df2), col("df1.path_short") === col("df2.path"))
并且增加了自动广播限制,但正如我上面提到的,它很可能不如普通的HashJoin
效率低。
作为使用子查询实现IN
一种可能方式,可以使用LEFT SEMI JOIN
:
JavaSparkContext javaSparkContext = new JavaSparkContext("local", "testApp"); SQLContext sqlContext = new SQLContext(javaSparkContext); StructType schema = DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("path", DataTypes.StringType, false), DataTypes.createStructField("value", DataTypes.StringType, false) }); // Prepare First DataFrame List dataForFirstDF = new ArrayList<>(); dataForFirstDF.add(RowFactory.create("a/a/a/b/c", "abc")); dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc")); dataForFirstDF.add(RowFactory.create("x/y/z", "xyz")); DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema); // df1.show(); // // +---------+-----+ // | path|value| // +---------+-----+ // |a/a/a/b/c| abc| // |a/b/c/d/e| abc| // | x/y/z| xyz| // +---------+-----+ // Prepare Second DataFrame List
dataForSecondDF = new ArrayList<>(); dataForSecondDF.add(RowFactory.create("a/a/a", "qwe")); dataForSecondDF.add(RowFactory.create("a/b/c", "qwe")); DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema); // Use left semi join to filter out df1 based on path in df2 Column pathContains = functions.column("firstDF.path").contains(functions.column("secondDF.path")); DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi"); // result.show(); // // +---------+-----+ // | path|value| // +---------+-----+ // |a/a/a/b/c| abc| // |a/b/c/d/e| abc| // +---------+-----+
此类查询的物理计划如下所示:
== Physical Plan == Limit 21 ConvertToSafe LeftSemiJoinBNL Some(Contains(path#0, path#2)) ConvertToUnsafe Scan PhysicalRDD[path#0,value#1] TungstenProject [path#2] Scan PhysicalRDD[path#2,value#3]
它将使用LeftSemiJoinBNL进行实际的连接操作,该操作应在内部广播值。 更多细节请参考Spark中的实际实现 – LeftSemiJoinBNL.scala
PS我不太明白需要删除最后两个字符,但是如果需要的话 – 可以这样做,就像@ zero323提议的那样(使用regexp_extract
)。