Tag: apache spark

Spark 1.6-无法在hadoop二进制路径中找到winutils二进制文件

我知道有一个非常相似的post( 无法在hadoop二进制路径中找到winutils二进制文件 ),但是,我已经尝试了建议的每一步,但仍然出现相同的错误。 我正在尝试在Windows 7上使用Apache Spark版本1.6.0来执行此页面上的教程http://spark.apache.org/docs/latest/streaming-programming-guide.html ,具体使用此代码: ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 但是,此错误不断出现: 阅读本文后, 无法在hadoop二进制路径中找到winutils二进制文件 我意识到我需要winutils.exe文件,所以我用它下载了一个hadoop二进制2.6.0,定义了一个名为HADOOP_HOME的环境变量: with value C:\Users\GERAL\Desktop\hadoop-2.6.0\bin 并将它放在路径上,如下所示:%HADOOP_HOME% 但是当我尝试代码时仍会出现相同的错误。 有谁知道如何解决这个问题?

Spark 2.0.0 Arrays.asList无法正常工作 – 不兼容的类型

以下代码适用于Spark 1.5.2但不适用于Spark 2.0.0。 我使用的是Java 1.8。 final SparkConf sparkConf = new SparkConf(); sparkConf.setMaster(“local[4]”); // Four threads final JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); final JavaRDD javaRDDLines = javaSparkContext.textFile(“4300.txt”); final JavaRDD javaRDDWords = javaRDDLines.flatMap(line -> Arrays.asList(line.split(” “))); 我得到以下错误 Error:(46, 66) java: incompatible types: no instance(s) of type variable(s) T exist so that java.util.List conforms to java.util.Iterator 我无法确定Spark API是否已更改或其他内容。 […]

Apache Spark需要5到6分钟才能从Cassandra中简单计算1亿行

我正在使用Spark Cassandra连接器。 从Cassandra表获取数据需要5-6分钟。 在Spark中,我在日志中看到了许多任务和Executor。 原因可能是Spark在许多任务中划分了这个过程! 下面是我的代码示例: public static void main(String[] args) { SparkConf conf = new SparkConf(true).setMaster(“local[4]”) .setAppName(“App_Name”) .set(“spark.cassandra.connection.host”, “127.0.0.1”); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD empRDD = javaFunctions(sc).cassandraTable(“dev”, “demo”); System.out.println(“Row Count”+empRDD.count()); }

如何使用JAVA在Spark DataFrame上调用UDF?

和这里一样的问题,但没有足够的意见在那里发表评论。 根据最新的Spark 文档 ,udf()可以以两种不同的方式使用,一种使用SQL,另一种使用DataFrame。 我发现了多个如何使用udf()和sql的示例,但是还没有找到关于如何直接在DataFrame上使用udf()的任何内容。 op在上面链接的问题上提供的解决方案使用了不推荐使用的callUDF() ,并且将根据Spark Java API文档在Spark 2.0中删除。 在那里,它说“因为它与udf()是多余的”,所以这意味着我应该能够使用udf()来调用我的udf(),但是无法弄清楚如何做到这一点。 我发现使用来自Java的Spark是非常不值得的..无休止的谷歌搜索每一步只是为了弄清楚如何进行明显的操作……每一步所忍受的痛苦都不会减轻下一步所需的痛苦!! 我没有偶然发现一些拼写Java-Spark程序语法的东西。 我错过了什么? import org.apache.spark.sql.api.java.UDF1; . . UDF1 mode = new UDF1() { public String call(final String[] types) throws Exception { return types[0]; } }; sqlContext.udf().register(“mode”, mode, DataTypes.StringType); df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

计算RDD中的行数

我正在使用带有java的spark,我有一个500万行的RDD。 是否有一个sollution可以让我计算我的RDD的行数。 我尝试过RDD.count()但需要花费很多时间。 我已经看到我可以使用functionfold 。 但我没有找到这个函数的java文档。 你能告诉我如何使用它或给我看另一个解决方案来获取我的RDD的行数。 这是我的代码: JavaPairRDD lines = getAllCustomers(sc).cache(); JavaPairRDD CFIDNotNull = lines.filter(notNull()).cache(); JavaPairRDD<String, Tuple2> join =lines.join(CFIDNotNull).cache(); double count_ctid = (double)join.count(); // i want to get the count of these three RDD double all = (double)lines.count(); double count_cfid = all – CFIDNotNull.count(); System.out.println(“********** :”+count_cfid*100/all +”% and now : “+ count_ctid*100/all+”%”); 谢谢。

如何使用单个spark上下文在Apache Spark中运行并发作业(操作)

它在Apache Spark文档中说“ 在每个Spark应用程序中,多个”作业“(Spark动作)如果由不同的线程提交,可能会同时运行 ”。 有人可以解释如何实现以下示例代码的并发性吗? SparkConf conf = new SparkConf().setAppName(“Simple_App”); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD file1 = sc.textFile(“/path/to/test_doc1”); JavaRDD file2 = sc.textFile(“/path/to/test_doc2”); System.out.println(file1.count()); System.out.println(file2.count()); 这两个工作是独立的,必须同时运行。 谢谢。

使用Spark从Azure Blob读取数据

我在通过火花流从azure色斑点中读取数据时遇到了问题 JavaDStream lines = ssc.textFileStream(“hdfs://ip:8020/directory”); 上面的代码适用于HDFS,但无法从Azure blob读取文件 https://blobstorage.blob.core.windows.net/containerid/folder1/ 以上是azure UI中显示的路径,但这不起作用,我错过了什么,我们如何访问它。 我知道Eventhub是流数据的理想选择,但我目前的情况要求使用存储而不是队列

使用sc.textFile以递归方式从子目录中获取文件内容

似乎SparkContext textFile只希望文件存在于给定的目录位置 – 它也不存在 (a)递归或 (b)甚至支持目录(尝试将目录读取为文件) 任何建议如何构造递归 – 可能比手动创建递归文件列表/下降逻辑更简单? 这是用例:文件下 /数据/表/ MY_TABLE 我希望能够通过hdfs调用该父目录下所有目录级别的所有文件。 UPDATE sc.textFile()通过(子类)TextInputFormat调用Hadoop FileInputFormat。 在逻辑内部存在执行递归目录读取 – 即首先检测条目是否是目录,如果是,则降序: for (FileStatus globStat: matches) { 218 if (globStat.isDir()) { 219 for(FileStatus stat: fs.listStatus(globStat.getPath(), 220 inputFilter)) { 221 result.add(stat); 222 } 223 } else { 224 result.add(globStat); 225 } 226 } 但是,在调用sc.textFile时,目录条目上存在错误:“not a file”。 这种行为令人困惑 – 因为似乎有适当的支持来处理目录。

无法读取工件描述符:IntelliJ

我遇到了我的Maven POM文件的问题,它无法找到火花依赖并且返回错误:无法读取org.apache.spark的工件描述符:spark-streaming-kafka_2.10:jar:1.2.1 我已经确认它不是任何公司防火墙的问题,因为所有其他依赖项都正确加载,只是这个。 我也能够在我的maven设置中确认它正试图从以下回购中获取。 我尝试删除本地计算机上的.m2 repo以重新加载它,仍然没有骰子。 http://repo.maven.apache.org/maven2/org/apache/spark/spark-streaming-kafka_2.10/1.2.1/ 下面是我的pom文件 my.group.id sentiment 1.0-SNAPSHOT NPITWITTER com.sparkjava spark-core 1.1.1 org.apache.spark spark-streaming-kafka_2.10 1.2.1 org.apache.spark spark-core_2.10 1.2.1 org.apache.spark spark-streaming_2.10 1.2.1 org.apache.spark spark-hive_2.10 1.2.1 org.apache.spark spark-sql_2.10 1.2.1

LogisticRegression的Spark MLLib TFIDF实现

我尝试使用火花1.1.0提供的新TFIDF算法。 我正在用Java编写MLLib的工作,但我无法弄清楚如何使TFIDF实现工作。 由于某种原因, IDFModel仅接受JavaRDD作为方法转换的输入而不是简单的Vector。 如何使用给定的类为我的LabledPoints建模TFIDF向量? 注意:文档行的格式为[标签; 文本] 到目前为止我的代码: // 1.) Load the documents JavaRDD data = sc.textFile(“/home/johnny/data.data.new”); // 2.) Hash all documents HashingTF tf = new HashingTF(); JavaRDD<Tuple2> tupleData = data.map(new Function<String, Tuple2>() { @Override public Tuple2 call(String v1) throws Exception { String[] data = v1.split(“;”); List myList = Arrays.asList(data[1].split(” “)); return new Tuple2(Double.parseDouble(data[0]), tf.transform(myList)); […]