Tag: spark dataframe

加入一个dataframespark java

首先,感谢您抽出时间阅读我的问题。 我的问题如下:在Spark with Java中,我在两个dataframe中加载了两个csv文件的数据。 这些数据框将具有以下信息。 Dataframe机场 Id | Name | City ———————– 1 | Barajas | Madrid Dataframe airport_city_state City | state —————- Madrid | España 我想加入这两个dataframe,使它看起来像这样: dataframe结果 Id | Name | City | state ————————– 1 | Barajas | Madrid | España 其中dfairport.city = dfaiport_city_state.city 但是我不能用语法来澄清所以我可以正确地进行连接。 我如何创建变量的一些代码: // Load the csv, you have to […]

基于第二个Dataframe的DataFrame过滤

使用Spark SQL,我有两个dataframe,它们是从一个创建的,例如: df = sqlContext.createDataFrame(…); df1 = df.filter(“value = ‘abc'”); //[path, value] df2 = df.filter(“value = ‘qwe'”); //[path, value] 我想过滤df1,如果它的’path’的一部分是df2中的任何路径。 因此,如果df1具有路径’a / b / c / d / e’的行,我会发现在df2中是否是路径为’a / b / c’的行。 在SQL中应该是这样的 SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2) 其中udf是用户定义的函数,用于缩短df1的原始路径。 天真的解决方案是使用JOIN然后过滤结果,但它很慢,因为df1和df2每行都超过10mil。 我也试过以下代码,但首先我必须从df2创建广播变量 static Broadcast bdf; bdf = sc.broadcast(df2); //variable […]

Spark – 使用数据框语法进行HAVING分组?

在没有sql / hiveContext的Spark中使用groupby-having的语法是什么? 我知道我能做到 DataFrame df = some_df df.registreTempTable(“df”); df1 = sqlContext.sql(“SELECT * FROM df GROUP BY col1 HAVING some stuff”) 但是我怎么用这样的语法来做呢 df = df.select(df.col(“*”)).groupBy(df.col(“col1”)).having(“some stuff”) ? 这个.having()似乎不存在。

如何在Java中的Apache Spark中将DataFrame转换为Dataset?

我可以很容易地将Scala中的DataFrame转换为Dataset: case class Person(name:String, age:Long) val df = ctx.read.json(“/tmp/persons.json”) val ds = df.as[Person] ds.printSchema 但在Java版本中我不知道如何将Dataframe转换为Dataset? 任何想法? 我的努力是: DataFrame df = ctx.read().json(logFile); Encoder encoder = new Encoder(); Dataset ds = new Dataset(ctx,df.logicalPlan(),encoder); ds.printSchema(); 但是编译器说: Error:(23, 27) java: org.apache.spark.sql.Encoder is abstract; cannot be instantiated 编辑(解决方案): 基于@Leet-Falcon的解决方案答案: DataFrame df = ctx.read().json(logFile); Encoder encoder = Encoders.bean(Person.class); Dataset ds = […]

Apache Spark中的数据集

Dataset ds = sc.read().json(“path”).as(Encoders.bean(Tweet.class)); ds.show(); JavaRDD dstry = ds.toJavaRDD(); System.out.println(dstry.first().getClass()); Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File ‘generated.java’, Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File ‘generated.java’, Line 50, Column 16: No applicable constructor/method found for actual parameters “org.apache.spark.unsafe.types.UTF8String”; candidates are: “public void sparkSQL.Tweet.setId(long)” at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) […]

在同一Master下的Java和R Apps之间共享SparkContext

所以这是设置。 目前我已经初始化了两个Spark应用程序。 我需要在它们之间传递数据(最好通过共享的sparkcontext / sqlcontext,这样我就可以查询临时表)。 我目前使用Parquet文件进行dataframe传输,但是有可能采用其他方式吗? MasterURL指向同一个SparkMaster 通过终端启动Spark: /opt/spark/sbin/start-master.sh; /opt/spark/sbin/start-slave.sh spark://`hostname`:7077 Java App设置: JavaSparkContext context = new JavaSparkContext(conf); //conf = setMaster(MasterURL), 6G memory, and 4 cores. SQLContext sqlContext = new SQLContext(parentContext.sc()); 然后我稍后注册一个现有的框架 //existing dataframe to temptable df.registerTempTable(“table”); 和 SparkR sc <- sparkR.init(master='MasterURL', sparkEnvir=list(spark.executor.memory='6G', spark.cores.max='4') sqlContext <- sparkRSQL.init(sc) # attempt to get temptable df <- sql(sqlContext, […]

如何使用Java在Spark SQL中加入多列以在DataFrame中进行过滤

DataFrame a =包含列x,y,z,k DataFrame b =包含列x,y,a a.join(b,) ??? 我试过用 a.join(b,a.col(“x”).equalTo(b.col(“x”)) && a.col(“y”).equalTo(b.col(“y”),”inner”) 但Java正在抛出错误,说&&是不允许的。 有人可以帮忙吗? 谢谢

为什么SparkSession为一个动作执行两次?

最近升级到Spark 2.0,我在尝试从JSON字符串创建一个简单的数据集时看到了一些奇怪的行为。 这是一个简单的测试用例: SparkSession spark = SparkSession.builder().appName(“test”).master(“local[1]”).getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD rdd = sc.parallelize(Arrays.asList( “{\”name\”:\”tom\”,\”title\”:\”engineer\”,\”roles\”:[\”designer\”,\”developer\”]}”, “{\”name\”:\”jack\”,\”title\”:\”cto\”,\”roles\”:[\”designer\”,\”manager\”]}” )); JavaRDD mappedRdd = rdd.map(json -> { System.out.println(“mapping json: ” + json); return json; }); Dataset data = spark.read().json(mappedRdd); data.show(); 并输出: mapping json: {“name”:”tom”,”title”:”engineer”,”roles”:[“designer”,”developer”]} mapping json: {“name”:”jack”,”title”:”cto”,”roles”:[“designer”,”manager”]} mapping json: {“name”:”tom”,”title”:”engineer”,”roles”:[“designer”,”developer”]} mapping json: {“name”:”jack”,”title”:”cto”,”roles”:[“designer”,”manager”]} +—-+——————–+——–+ |name| roles| title| […]

Spark sql如何在不丢失空值的情况下爆炸

我有一个Dataframe,我试图压扁。 作为整个过程的一部分,我想爆炸它,所以如果我有一列数组,那么数组的每个值都将用于创建一个单独的行。 例如, id | name | likes _______________________________ 1 | Luke | [baseball, soccer] 应该成为 id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 这是我的代码 private DataFrame explodeDataFrame(DataFrame df) { DataFrame resultDf = df; for (StructField field : df.schema().fields()) { if (field.dataType() instanceof ArrayType) { resultDf = […]