Tag: apache spark

如果我在Spark中缓存两次相同的RDD会发生什么

我正在构建一个接收RDD的generics函数,并对其进行一些计算。 由于我在输入RDD上运行多个计算,我想缓存它。 例如: public JavaRDD foo(JavaRDD r) { r.cache(); JavaRDD t1 = r… //Some calculations JavaRDD t2 = r… //Other calculations return t1.union(t2); } 我的问题是,因为r是给我的,它可能已经或可能没有被缓存。 如果它被缓存并且我再次调用缓存,那么spark会创建一个新的缓存层,这意味着在计算t1和t2 ,我将在缓存中有两个r实例吗? 或者火花是否意识到r被缓存并将忽略它?

Java,Spark和Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFuture无法转换为阴影

我在尝试将数据写入我的Cassandra数据库时遇到错误。 我在这里得到的:1)Dictionary.java package com.chatSparkConnactionTest; import java.io.Serializable; public class Dictionary implements Serializable{ private String value_id; private String d_name; private String d_value; public Dictionary(){} public Dictionary (String value_id, String d_name, String d_value) { this.setValue_id(value_id); this.setD_name(d_name); this.setD_value(d_value); } public String getValue_id() { return value_id; } public void setValue_id(String value_id) { this.value_id = value_id; } public String getD_name() { […]

Spark的Column.isin函数不带List

我正在尝试从Spark Dataframe中过滤出行。 val sequence = Seq(1,2,3,4,5) df.filter(df(“column”).isin(sequence)) 不幸的是,我得到了一个不受支持的文字类型错误 java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon List(1,2,3,4,5) 根据文档,它采用scala.collection.Seq列表 我想我不想要文字? 然后我可以接受什么样的包装类呢?

处理Spark Scala中的微秒

我使用Scala将PostgreSQL表作为dataframe导入spark。 数据框看起来像 user_id | log_dt ——–| ——- 96 | 2004-10-19 10:23:54.0 1020 | 2017-01-12 12:12:14.931652 我正在转换此dataframe,使log_dt的数据格式为yyyy-MM-dd hh:mm:ss.SSSSSS 。 为了实现这一点,我使用以下代码使用unix_timestamp函数将log_dt转换为时间戳格式。 val tablereader1=tablereader1Df.withColumn(“log_dt”,unix_timestamp(tablereader1Df(“log_dt”),”yyyy-MM-dd hh:mm:ss.SSSSSS”).cast(“timestamp”)) 当我打印使用命令tablereader1.show()打印tablereader1dataframe时,我得到以下结果 user_id | log_dt ——–| ——- 96 | 2004-10-19 10:23:54.0 1020 | 2017-01-12 12:12:14.0 如何将微秒保留为时间戳的一部分? 任何建议表示赞赏。

Dag-scheduler-event-loop java.lang.OutOfMemoryError:无法创建新的本机线程

运行5-6小时后,我从火花驱动程序中得到以下错误。 我使用的是Ubuntu 16.04 LTS和open-jdk-8。 Exception in thread “ForkJoinPool-50-worker-11” Exception in thread “dag-scheduler-event-loop” Exception in thread “ForkJoinPool-50-worker-13” java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672) at scala.concurrent.forkjoin.ForkJoinPool.deregisterWorker(ForkJoinPool.java:1795) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:117) java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672) at scala.concurrent.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.push(ForkJoinPool.java:1072) at scala.concurrent.forkjoin.ForkJoinTask.fork(ForkJoinTask.java:654) at […]

如何从sparkdataframe列中的数组中提取值

我在spark sql中得到了一个这样的dataframe: scala> result.show +———–+————–+ |probability|predictedLabel| +———–+————–+ | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.1,0.9]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.0,1.0]| 0.0| | [0.1,0.9]| 0.0| | [0.6,0.4]| 1.0| | [0.6,0.4]| 1.0| | [1.0,0.0]| 1.0| […]

Spark – 使用数据框语法进行HAVING分组?

在没有sql / hiveContext的Spark中使用groupby-having的语法是什么? 我知道我能做到 DataFrame df = some_df df.registreTempTable(“df”); df1 = sqlContext.sql(“SELECT * FROM df GROUP BY col1 HAVING some stuff”) 但是我怎么用这样的语法来做呢 df = df.select(df.col(“*”)).groupBy(df.col(“col1”)).having(“some stuff”) ? 这个.having()似乎不存在。

SparkSQL并在Java中的DataFrame上爆炸

有没有一种简单的方法如何在SparkSQL DataFrame上使用数组列explode ? 它在Scala中相对简单,但是这个函数似乎在Java中不可用(如javadoc中所述)。 一个选项是使用SQLContext.sql(…)并在查询中explodefunction,但我正在寻找更好,更清洁的方式。 DataFrame是从镶木地板文件加载的。

在Spark 0.9.0上运行作业会引发错误

我安装了Apache Spark 0.9.0群集,我正在尝试部署从HDFS读取文件的代码。 这段代码会发出警告,最终失败。 这是代码 /** * running the code would fail * with a warning * Initial job has not accepted any resources; check your cluster UI to ensure that * workers are registered and have sufficient memory */ object Main extends App { val sconf = new SparkConf() .setMaster(“spark://labscs1:7077”) .setAppName(“spark scala”) val sctx […]

Spark序列化和Java序列化有什么区别?

我正在使用Spark + Yarn,我有一个我想在分布式节点上调用的服务。 当我在使用java序列化的Junit测试中“手动”序列化此服务对象时,服务的所有内部集合都被很好地序列化和反序列化: @Test public void testSerialization() { try ( ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext(“spring-context.xml”); FileOutputStream fileOutputStream = new FileOutputStream(“myService.ser”); ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream); ) { final MyService service = (MyService) contextBusiness.getBean(“myServiceImpl”); objectOutputStream.writeObject(service); objectOutputStream.flush(); } catch (final java.io.IOException e) { logger.error(e.getMessage(), e); } } @Test public void testDeSerialization() throws ClassNotFoundException { try ( […]