Tag: apache spark

如何使用java创建一个简单的spark graphframe?

基本上我是一名java开发人员,现在我有机会参与Spark并且我经历了Spark api的基础知识,比如SparkConfig,SparkContaxt,RDD,SQLContaxt,DataFrame,DataSet然后我能够执行一些简单的简单转换RDD,SQL ….但是当我尝试使用java训练一些示例graphframe应用程序时,我可以成功并且我经历了很多youtube教程,论坛和stackoverflow线程但没有我没有找到任何直接建议当我尝试为GraphFrame类创建一个对象时,我实际上遇到了这个问题,我也下载了接收jar( graphframes-0.2.0-spark2.0-s_2.11.jar ),但现在仍然面临问题我想放我的分析直到我到达的地方由于Spark的新事物我无法进一步移动所以如果有人帮助我它对所有人都非常有帮助。 提前致谢。 我面临的例外是构造函数GraphFrame(DataFrame,DataFrame)未定义 import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import org.graphframes.GraphFrame; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; public class SparkJavaGraphFrameOne { public static void main(String[] args) throws JsonParseException, JsonMappingException, […]

Spark与Cassandra输入/输出

想象一下以下场景:Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据。 该应用程序还从数据库中蒸出新数据,这些数据也由自定义接收器处理。 流处理的输出存储在数据库中。 该实现使用Spring Data Cassandra与数据库集成。 CassandraConfig: @Configuration @ComponentScan(basePackages = {“org.foo”}) @PropertySource(value = { “classpath:cassandra.properties” }) public class CassandraConfig { @Autowired private Environment env; @Bean public CassandraClusterFactoryBean cluster() { CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean(); cluster.setContactPoints(env.getProperty(“cassandra.contactpoints”)); cluster.setPort(Integer.parseInt(env.getProperty(“cassandra.port”))); return cluster; } @Bean public CassandraMappingContext mappingContext() { return new BasicCassandraMappingContext(); } @Bean public CassandraConverter converter() { return new […]

Apache Spark:在Java中有效地使用mapPartitions

在当前早期发布的名为High Performance Spark的教科书中, Spark的开发人员注意到: 为了让Spark能够灵活地将一些记录溢出到磁盘,重要的是在mapPartitions中表示你的函数,这样你的函数就不会强制将整个分区加载到内存中(例如隐式转换为列表)。 迭代器有很多方法可以编写函数式转换,或者你可以构造自己的自定义迭代器。 当转换直接获取并返回迭代器而不强制它通过另一个集合时,我们称之为迭代器到迭代器的转换。 但是,教科书缺乏使用mapPartitions或类似方法变体的好例子。 并且在线存在很少的好代码示例 – 其中大多数是Scala。 例如,我们使用mapPartitions编写的mapPartitions看到这个Scala代码如何将列添加到mapPartitions中的org.apache.spark.sql.Row中 。 def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow) sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show 不幸的是,Java没有提供像iter.map(…)那样好的迭代器。 所以它引出了一个问题,如何有效地使用mapPartitions的迭代器到迭代器转换而不将RDD作为列表完全溢出到磁盘? JavaRDD collection = prevCollection.mapPartitions((Iterator iter) -> { ArrayList out = new ArrayList(); while(iter.hasNext()) { InObj current = iter.next(); out.add(someChange(current)); } return out.iterator(); }); 这似乎是在Java示例中使用mapPartitions的一般语法,但我不知道这将是多么有效,假设你有一个拥有数万条记录的JavaRDD (甚至更多…因为,Spark是对于大数据)。 你最终会得到迭代器中所有对象的列表,只是将它转回迭代器(这就要求某种地图函数在这里效率更高)。 注意 :虽然使用mapPartitions这8行代码可以写成带有map或flatMap 1行,但我有意使用mapPartitions来利用它对每个分区而不是RDD每个元素进行操作的事实。 请问有什么想法吗?

从Apache Spark SQL中的用户定义聚合函数(UDAF)返回多个数组

我正在尝试使用Apache Spark SQL在Java中创建用户定义的聚合函数(UDAF),该函数在完成时返回多个数组。 我在网上搜索过,找不到任何关于如何做到这一点的例子或建议。 我能够返回单个数组,但无法弄清楚如何在evaluate()方法中以正确的格式获取数据以返回多个数组。 UDAF确实有效,因为我可以在evaluate()方法中打印出数组,我无法弄清楚如何将这些数组返回到调用代码(下面显示以供参考)。 UserDefinedAggregateFunction customUDAF = new CustomUDAF(); DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col(“long_col”), dataFrame.col(“double_col”))).as(“processed_data”); 我在下面包含了整个自定义UDAF类,但关键方法是dataType()和evaluate方法(),它们首先显示。 任何帮助或建议将不胜感激。 谢谢。 public class CustomUDAF extends UserDefinedAggregateFunction { @Override public DataType dataType() { // TODO: Is this the correct way to return 2 arrays? return new StructType().add(“longArray”, DataTypes.createArrayType(DataTypes.LongType, false)) .add(“dataArray”, DataTypes.createArrayType(DataTypes.DoubleType, false)); } @Override public Object evaluate(Row buffer) […]

sparkContext JavaSparkContext SQLContext SparkSession之间的区别?

sparkContext, javaSparkContext, SQLContext和SparkSession什么SparkSession 。 有没有使用Sparksession转换或创建Context的方法? 我可以使用一个条目SparkSession完全替换所有Context吗? 是否在SQLContext中添加了SparkSession , SparkContext , JavaSparkContext等中的所有函数? 像parallelize这样的函数在SparkContext和JavaSparkContext有不同的用法。 如何在SparkSession使用这样的function? 如何使用SparkSession创建以下SparkSession ? RDD JavaRDD JavaPairRDD 数据集 有没有方法将JavaPairRDD转换为Dataset或Dataset到JavaPairRDD ?

Spark – foreach Vs foreachPartitions何时使用什么?

我想知道foreachPartitions是否会产生更好的性能,因为更高的并行度,与foreach方法相比,考虑到我正在流经RDD以便对累加器变量执行一些求和的情况。

Spark 1.5.1,Cassandra Connector 1.5.0-M2,Cassandra 2.1,Scala 2.10,NoSuchMethodError番石榴依赖

Spark环境的新手(对Maven来说还是新手)所以我正在努力解决如何正确发送我需要的依赖项。 看起来Spark 1.5.1有一个guava-14.0.1依赖,它试图使用,而且isPrimitive是在15+中添加的。 确保我的优步jar获胜的正确方法是什么? 我在我的spark-defaults.conf中尝试过spark.executor.extraClassPath无济于事。 复制到[问题]: Spark 1.5.1 + Scala 2.10 + Kafka + Cassandra = Java.lang.NoSuchMethodError:但对于Maven来说(还没有回复评论) 剥下我的依赖关系: com.google.guava guava 18.0 org.apache.commons commons-compress 1.10 com.esotericsoftware.kryo kryo 2.21 org.objenesis objenesis 2.1 org.apache.spark spark-core_2.10 1.5.0 org.slf4j slf4j-log4j12 log4j log4j org.apache.spark spark-sql_2.10 1.5.0 com.datastax.spark spark-cassandra-connector_2.10 1.5.0-M2 使用以下方法为我的JAR添加所有依赖项: org.apache.maven.plugins maven-shade-plugin 2.3 package shade org.apache.hadoop:* org.apache.hbase:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA […]

无法连接到spark master:InvalidClassException:org.apache.spark.rpc.RpcEndpointRef; 本地类不兼容

我在Linux机器上安装了Spark。 版本是spark-1.6.2-bin-hadoop2.6.tgz。 然后我使用./sbin/start-all.sh启动Spark 我试图在Eclipse中运行JavaWordCount.java示例。 但总是失败。 有人可以帮助我吗? Spark Master的版本是:欢迎使用版本1.6.2,使用Scala版本2.10.5(Java HotSpot(TM)Server VM,Java 1.8.0_101),Eclipse上的Spark版本是: 例外情况如下: 16/07/25 12:01:20 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark:// hostname:7077… 16/07/25 12:01:20 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master hostname:7077 org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) at […]

使用saveAsTextFile的Spark NullPointerException

在尝试合并并保存RDD时,我得到了一个N​​PE。 代码在本地工作, 并在scala shell中的集群上工作,但在将其作为作业提交到集群时会引发错误。 我已经尝试使用take()打印输出以查看rdd是否包含一些空数据,但这会引发相同的错误 – 因为它在shell中正常工作会很痛苦。 我正在保存到HDFS并且在变量中有完整的url路径 – 在MLLib训练阶段,模型可以使用此方法保存。 任何想法非常感谢! Scala代码(整体预测function): //Load the Random Forest val rfModel = RandomForestModel.load(sc, modelPath) //Make the predictions – Here the label is the unique ID of the point val rfPreds = labDistVect.map(p => (p.label, rfModel.predict(p.features))) //Collect and save println(“Done Modelling, now saving preds”) val outP = rfPreds.coalesce(1,true).saveAsTextFile(outPreds) println(“Done […]

更改DataFrame.write()的输出文件名前缀

通过Spark SQL DataFrame.write()方法生成的输出文件以“part”basename前缀开头。 例如 DataFrame sample_07 = hiveContext.table(“sample_07”); sample_07.write().parquet(“sample_07_parquet”); 结果是: hdfs dfs -ls sample_07_parquet/ Found 4 items -rw-r–r– 1 rob rob 0 2016-03-19 16:40 sample_07_parquet/_SUCCESS -rw-r–r– 1 rob rob 491 2016-03-19 16:40 sample_07_parquet/_common_metadata -rw-r–r– 1 rob rob 1025 2016-03-19 16:40 sample_07_parquet/_metadata -rw-r–r– 1 rob rob 17194 2016-03-19 16:40 sample_07_parquet/part-r-00000-cefb2ac6-9f44-4ce4-93d9-8e7de3f2cb92.gz.parquet 我想更改使用Spark SQL DataFrame.write()创建文件时使用的输出文件名前缀。 我尝试在Spark上下文的hadoop配置中设置“mapreduce.output.basename”属性。 例如 public […]