Tag: apache spark

Spark SQL:镶嵌错误的嵌套类

我似乎无法写一个JavaRDD ,其中T是一个说法, Person类。 我把它定义为 public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private String age; private Address address; …. Address : public class Address implements Serializable { private static final long serialVersionUID = 1L; private String City; private String Block; … 然后我像这样创建一个JavaRDD : JavaRDD people = sc.textFile(“/user/johndoe/spark/data/people.txt”).map(new […]

基于第二个Dataframe的DataFrame过滤

使用Spark SQL,我有两个dataframe,它们是从一个创建的,例如: df = sqlContext.createDataFrame(…); df1 = df.filter(“value = ‘abc'”); //[path, value] df2 = df.filter(“value = ‘qwe'”); //[path, value] 我想过滤df1,如果它的’path’的一部分是df2中的任何路径。 因此,如果df1具有路径’a / b / c / d / e’的行,我会发现在df2中是否是路径为’a / b / c’的行。 在SQL中应该是这样的 SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2) 其中udf是用户定义的函数,用于缩短df1的原始路径。 天真的解决方案是使用JOIN然后过滤结果,但它很慢,因为df1和df2每行都超过10mil。 我也试过以下代码,但首先我必须从df2创建广播变量 static Broadcast bdf; bdf = sc.broadcast(df2); //variable […]

Spark流mapWithState超时延迟了吗?

我期望Spark 1.6+的新mapWithState API几乎立即删除超时的对象,但是有一个延迟。 我正在使用下面的JavaStatefulNetworkWordCount的改编版本测试API: SparkConf sparkConf = new SparkConf() .setAppName(“JavaStatefulNetworkWordCount”) .setMaster(“local[*]”); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); ssc.checkpoint(“./tmp”); StateSpec<String, Integer, Integer, Tuple2> mappingFunc = StateSpec.function((word, one, state) -> { if (state.isTimingOut()) { System.out.println(“Timing out the word: ” + word); return new Tuple2(word, state.get()); } else { int sum = one.or(0) + (state.exists() ? state.get() : […]

如何在GroupBy操作后从spark DataFrame列中收集字符串列表?

这里描述的解决方案(通过zero323)非常接近我想要的两个曲折: 我如何用Java做到这一点? 如果列有一个字符串列表而不是一个字符串,并且我想在GroupBy(其他一些列)之后将所有这些列表收集到一个列表中,该怎么办? 我正在使用Spark 1.6并试图使用 org.apache.spark.sql.functions.collect_list(Column col) ,如该问题的解决方案中所述,但得到以下错误 线程“main”中的exceptionorg.apache.spark.sql.AnalysisException:undefined function collect_list; at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry $$ anonfun $ 2.apply(FunctionRegistry.scala:65)at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry $$ anonfun $ 2.apply(FunctionRegistry。 scala:65)在scala.Option.getOrElse(Option.scala:121)

如何使用Java有效地读取Hadoop(HDFS)文件中的第一行?

我的Hadoop集群上有一个大的CSV文件。 该文件的第一行是“标题”行,由字段名称组成。 我想对这个标题行进行操作,但我不想处理整个文件。 另外,我的程序是用Java编写的,并使用Spark。 在Hadoop集群上只读取大型CSV文件的第一行的有效方法是什么?

Apache Spark mapPartitionsWithIndex

有人能举例说明在Java中正确使用mapPartitionsWithIndex吗? 我发现了许多Scala示例,但缺少Java。 我的理解是正确的,使用此函数时,单独的节点将处理单独的分区。 我收到以下错误 method mapPartitionsWithIndex in class JavaRDD cannot be applied to given types; JavaRDD rdd = sc.textFile(filename).mapPartitionsWithIndex required: Function2<Integer,Iterator,Iterator>,boolean found: <anonymous Function2<Integer,Iterator,Iterator<JavaRDD>>> 做的时候 JavaRDD rdd = sc.textFile(filename).mapPartitionsWithIndex( new Function2<Integer, Iterator, Iterator<JavaRDD> >() { @Override public Iterator<JavaRDD> call(Integer ind, String s) {

Spark DataFrame – 选择n个随机行

我有一个包含数千条记录的数据框,我想随机选择1000行到另一个数据框进行演示。 我怎么能用Java做到这一点? 谢谢!

关于hadoop 2.2.0 maven依赖性的火花0.9.1

我在pom.xml中设置了Apache Spark maven依赖,如下所示 org.apache.spark spark-core_2.10 0.9.1 但我发现这个依赖使用“ hadoop-client-1.0.4.jar ”和“ hadoop-core-1.0.4.jar ”,当我运行我的程序时,我得到了错误“ org.apache.hadoop.ipc。 RemoteException:服务器IPC版本9无法与客户端版本4 “ 通信 ,这表明我需要将hadoop版本从1.0.4切换到2.2.0。 更新 : 以下解决方案是解决此问题的正确方法吗? org.apache.spark spark-core_2.10 0.9.1 org.apache.hadoop hadoop-core org.apache.hadoop hadoop-client org.apache.hadoop hadoop-client 2.2.0 非常感谢您的帮助。

如何使Spark Streaming计算unit testing中文件中的单词?

我已经在Java中成功构建了一个非常简单的Spark Streaming应用程序,该应用程序基于Scala中的HdfsCount示例 。 当我将此应用程序提交给我的本地Spark时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印出单词数。 我按Ctrl + C终止应用程序。 现在我已经尝试为这个function创建一个非常基本的unit testing,但是在测试中我无法打印相同的信息,即单词的数量。 我错过了什么? 下面是unit testing文件,之后我还包含了显示countWords方法的代码片段: StarterAppTest.java import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext(“local”, “test”, new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void […]

Spark Local Mode – 所有作业仅使用一个CPU核心

我们使用在单个AWS EC2实例上以本地模式运行Spark Java “local[*]” 但是,使用New Relic工具进行性能分析和简单的“顶级”显示,我们已经编写了三个不同的Java spark工作,我们的16个核心机器中只有一个CPU核心用过(我们也尝试过不同的AWS实例,但只有一个核心永远使用)。 Runtime.getRuntime().availableProcessors()报告16个处理器, sparkContext.defaultParallelism()报告16个处理器。 我查看了各种Stackoverflow本地模式问题,但似乎没有解决问题。 任何建议都非常感谢。 谢谢 编辑:过程 1)使用sqlContext从光盘(S3)使用com.databricks.spark.csv读取gzip压缩的CSV文件1到DataFrame DF1。 2)使用sqlContext从光盘(S3)使用com.databricks.spark.csv将gzip压缩的CSV文件2读入DataFrame DF2。 3)使用DF1.toJavaRDD()。mapToPair(返回元组的新映射函数)RDD1 4)使用DF2.toJavaRDD()。mapToPair(返回元组的新映射函数)RDD2 5)在RDD上调用union 6)将联合RDD上的reduceByKey()调用为“按键合并”,因此具有仅具有特定键的一个实例的元组>(因为同一个键出现在RDD1和RDD2中)。 7)调用.values()。map(新映射函数,它迭代提供的List中的所有项目,并根据需要合并它们以返回相同或更小长度的List 8)调用.flatMap()来获取RDD 9)使用sqlContext从DomainClass类型的平面地图创建DataFrame 10)使用DF.coalease(1).write()将DF作为gzip压缩写入S3。