Tag: apache spark

缓存()/ persist()的apache-spark内存消耗

当我尝试缓存()或持久化(MEMORY_ONLY_SER())我的RDD时,我的spark簇会挂起。 它工作得很好,并在大约7分钟内计算结果。 如果我不使用cache()。 我有6个c3.xlarge EC2实例(4个内核,每个7.5 GB RAM),共有24个内核和37.7 GB。 我在master上使用以下命令运行我的应用程序: SPARK_MEM = 5g MEMORY_FRACTION =“0.6”SPARK_HOME =“/ root / spark”java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-孵化-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis 数据集大约有50GB的数据分成24个文件。 我将其压缩并存储在S3存储桶中的24个文件中(每个文件大小为7MB到300MB)。 我绝对找不到我的集群这种行为的原因,但似乎火花消耗了所有可用内存并进入了GC收集循环。 当我查看gc verbose时,我可以找到如下的循环: [GC 5208198K(5208832K), 0,2403780 secs] [Full GC 5208831K->5208212K(5208832K), 9,8765730 secs] [Full GC 5208829K->5208238K(5208832K), 9,7567820 secs] [Full GC 5208829K->5208295K(5208832K), 9,7629460 secs] [GC 5208301K(5208832K), 0,2403480 secs] [Full GC 5208831K->5208344K(5208832K), 9,7497710 secs] [Full […]

Spark – 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?

看看这个问题: Scala + Spark – 任务不可序列化:java.io.NotSerializableExceptionon。 当只在类而不是对象上调用闭包外的函数时 。 问题: 假设我的映射器可以是函数(def),它在内部调用其他类并创建对象并在其中执行不同的操作。 (或者它们甚至可以是扩展(Foo)=> Bar的类并在其apply方法中进行处理 – 但是现在让我们忽略这种情况) Spark仅支持用于闭包的Java Serialization。 有没有办法解决这个问题? 我们可以用东西而不是封闭来做我想做的事吗? 我们可以使用Hadoop轻松完成这类工作。 这一点让Spark几乎无法使用。 人们不能指望所有第三方库都将所有类扩展为Serializable! 可能的解决方案: 这样的事情似乎有用吗: https : //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala 它看起来似乎是一个包装器的答案,但我不知道究竟是怎么回事。

如何使用Spark DataFrame计算Cassandra表的汇总统计量?

我试图得到一些Cassandra / SPARK数据的最小值,最大值,但我需要用JAVA来做。 import org.apache.spark.sql.DataFrame; import static org.apache.spark.sql.functions.*; DataFrame df = sqlContext.read() .format(“org.apache.spark.sql.cassandra”) .option(“table”, “someTable”) .option(“keyspace”, “someKeyspace”) .load(); df.groupBy(col(“keyColumn”)) .agg(min(“valueColumn”), max(“valueColumn”), avg(“valueColumn”)) .show(); 编辑显示工作版本:确保“围绕someTable和someKeyspace

如何读取嵌套的JSON以进行聚合?

我是Spark的新手。 我想要做的就是读取嵌套的jsons并根据特定条件对它们进行分组。 例如:如果json包含像他的城市和邮政编码这样的人的详细信息。 我想对属于同一城市和邮政编码的人进行分组。 我已经取得了进展,直到将jsons读入DataSet。 但我不知道如何将它们分组。 我的嵌套JSON格式是 { “entity”: { “name”: “SJ”, “id”: 31 }, “hierarchy”: { “state”: “TN”, “city”: “CBE” }, “data”: {}} 这是我编写的用于从文件中读取嵌套json的代码。 public void groupJsonString(SparkSession spark) { Dataset studentRecordDS = spark.read() .option(“timestampFormat”, “yyyy/MM/dd HH:mm:ss ZZ”) .json(“/home/shiney/Documents/NGA/sparkJsonFiles/*.json”); StructType st = studentRecordDS.schema(); List nestedList = new ArrayList(); for(StructField field : st.fields()) { nestedList.add((StructType)field.dataType()); } […]

在Apache Spark中,我可以轻松地重复/嵌套SparkContext.parallelize吗?

我正在尝试模拟我们正试图解决的遗传问题,逐步建立起来。 我可以成功运行Spark示例中的PiAverage示例。 这个例子在一个圆圈(在我们的例子中为10 ^ 6)“投掷飞镖”并计算“在圆圈中着陆”以估计PI的数量 假设我想重复该过程1000次(并行)并平均所有这些估计值。 我试图看到最好的方法,似乎有两个并行化的调用? 嵌套电话? 有没有办法将地图链接起来或减少一起呼叫? 我看不到它。 我想知道下面这个想法的智慧。 我想过使用累加器跟踪得到的估计值。 jsc是我的SparkContext,单个运行的完整代码是在问题的结尾,感谢任何输入! Accumulator accum = jsc.accumulator(0.0); // make a list 1000 long to pass to parallelize (no for loops in Spark, right?) List numberOfEstimates = new ArrayList(HOW_MANY_ESTIMATES); // pass this “dummy list” to parallelize, which then // calls a pieceOfPI method to produce each […]

Apache Spark中的数据集

Dataset ds = sc.read().json(“path”).as(Encoders.bean(Tweet.class)); ds.show(); JavaRDD dstry = ds.toJavaRDD(); System.out.println(dstry.first().getClass()); Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File ‘generated.java’, Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File ‘generated.java’, Line 50, Column 16: No applicable constructor/method found for actual parameters “org.apache.spark.unsafe.types.UTF8String”; candidates are: “public void sparkSQL.Tweet.setId(long)” at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) […]

无法使用Java连接到HBase

我正在尝试使用Java连接HBase。 只有一个节点,这是我自己的机器。 好像我无法成功连接。 这是我的Java代码: public class Test { public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ServiceException { SparkConf conf = new SparkConf().setAppName(“Test”).setMaster(“spark://10.239.58.111:7077”); JavaSparkContext sc = new JavaSparkContext(conf); sc.addJar(“/home/cloudera/workspace/Test/target/Test-0.0.1-SNAPSHOT.jar”); Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.addResource(new Path(“/usr/lib/hbase/conf/hbase-site.xml”)); HTable table = new HTable(hbaseConf, “rdga_by_id”); } } 我试着在这样的代码中设置环境, hbaseConf.set(“hbase.master”, “localhost”); hbaseConf.set(“hbase.master.port”, “60000”); hbaseConf.set(“hbase.zookeeper.property.clientPort”, “2181”); hbaseConf.set(“hbase.zookeeper.quorum”, “quickstart.cloudera”); hbaseConf.set(“hbase.zookeeper.quorum”, “localhost”); […]

如何在不使用collect函数的情况下有效地将rdd转换为list

我们知道如果我们需要将RDD转换为列表,那么我们应该使用collect()。 但是这个函数给驱动程序带来了很大的压力(因为它将来自不同执行程序的所有数据带到驱动程序中),这会导致性能下降或更糟(整个应用程序可能会失败)。 有没有其他方法可以将RDD转换为任何java util集合而不使用不会导致性能下降的collect()或collectAsMap()等? 基本上在我们处理批量或流数据处理中的大量数据的当前场景中,诸如collect()和collectAsMap()之类的API在具有实际数据量的真实项目中变得完全无用。 我们可以在演示代码中使用它,但这些都可用于这些API。 那么为什么要有一个我们甚至无法使用的API(或者我错过了什么)。 有没有更好的方法通过其他方法实现相同的结果,或者我们可以更有效地实现collect()和collectAsMap()其他只是调用 List myList= RDD.collect.toList (影响性能) 我抬头看谷歌但找不到任何有效的东西。 如果有人有更好的方法,请帮忙。

类型不匹配:无法从Java Spark中的Iterator 转换

线程“main”中的exceptionjava.lang.Error:未解决的编译问题:类型不匹配:无法从Iterator转换为Iterable 在com.spark.wordcount.lession1.WordCount2.main(WordCount2.java:26) SparkConf conf = new SparkConf().setAppName(“cust data”).setMaster(“local[*]”); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD lines = sc.textFile(“C:\\\\Users\\\\dell\\\\Desktop\\\\simple_text_file.txt”); JavaRDD words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); JavaPairRDD ones = words.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD counts = ones.reduceByKey((i1, i2) -> i1 + i2); List<Tuple2> output = counts.collect(); for (Tuple2 tuple : output) { System.out.println(tuple._1() + “: ” + […]

如果列标签是同名的,如何使用java解析Spark中的XML

我尝试使用java 1.8解析spark 2.2中的XML,但这并没有给出预期的数据集。 样本xml – url1 123 english 我试过的代码 – Dataset xmlParse = spark .read() .format(“com.databricks.spark.xml”) //.option(“rootTag”, “RECORDS”) .option(“rowTag”, “RECORD”) .load(“sample.xml”); xmlParse.printSchema() root |– PROP: array (nullable = true) | |– element: struct (containsNull = true) | | |– PVAL: string (nullable = true) | | |– _NAME: string (nullable = true) xmlParse.show |PROP +——————————————- […]