Tag: apache spark

用spark分析日志文件?

我编写了程序来分析日志,其目的是过滤/打印日志中的错误语句 String master = “local[*]”; String inputFilePath = “C:/test.log”; SparkConf conf = new SparkConf().setAppName(App.class.getName()) .setMaster(master); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD stringRDD = context.textFile(inputFilePath); stringRDD.filter(text -> text.contains(“ERROR”)) .collect().forEach(result -> System.out.println(result)); 但是日志文件正在由不同的进程连续写入。 这是时间线示例 在T1,日志文件中存在10行 在T2(5秒后),再加入5行 在T3(5秒后),再加入7行 现在我的程序应该在5秒后读取文件并仅从新添加的行打印错误语句。 我是否应该手动生成每隔5秒钟继续读取的线程,或者是否有更好的火花方式? 更新: – 基于谷歌我尝试下面但没有帮助 SparkConf conf = new SparkConf().setAppName(App.class.getName()) .setMaster(master); //JavaSparkContext context = new JavaSparkContext(conf); JavaStreamingContext streamingContext = new […]

初始工作没有接受任何资源; 检查群集UI以确保工作人员已注册并具有足够的资源

我正在尝试从Eclipse运行spark示例并获得此一般错误: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. 我的版本是spark-1.6.2-bin-hadoop2.6. 我使用shell中的./sbin/start-master.sh命令启动了spark,并将我的sparkConf设置为: SparkConf conf = new SparkConf().setAppName(“Simple Application”); conf.setMaster(“spark://My-Mac-mini.local:7077”); 我没有在这里带任何其他代码,因为我正在运行的任何示例都会弹出此错误。 该机器是Mac OSX,我很确定它有足够的资源来运行最简单的例子。 […]

Spark运行时错误:spark.metrics.sink.MetricsServlet无法实例化

在IntelliJ中使用maven中的spark 1.3 lib运行项目时,我遇到了调用目标exception。 我只在IntelliJ IDE中遇到此错误。 在我部署jar并通过spark-submit运行后,错误消失了。 以前有人遇到过同样的问题吗? 我希望解决这个问题,以便进行简单的调试。 否则每次我想运行代码时都要打包jar。 详情如下: 2015-04-21 09:39:13 ERROR MetricsSystem:75 – Sink class org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized 2015-04-21 09:39:13 ERROR TrainingSFERunner:144 – java.lang.reflect.InvocationTargetException 2015-04-20 16:08:44 INFO BlockManagerMaster:59 – Registered BlockManager Exception in thread “main” java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:187) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:181) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at […]

如何通过Sparklyr在本地模式下运行Spark时配置驱动程序内存?

我正在使用Sparklyr在具有244GB RAM的虚拟机上以本地模式运行Spark应用程序。 在我的代码中,我使用spark_read_csv()从一个文件夹读取~50MB的csvs,然后从第二个文件夹读取~1.5GB的csvs。 我的问题是,当尝试读取第二个文件夹时,应用程序会抛出错误。 据我了解,问题是驱动程序JVM可用的默认RAM是512MB – 对于第二个文件夹来说太小(在本地模式下,所有操作都在驱动程序JVM中运行,如此处所述如何设置Apache Spark Executor内存 。所以我需要将spark.driver.memory参数增加到更大的值。 问题是我不能通过sparklyr文档中描述的常规方法设置此参数(即通过spark_config() , config.yml文件或spark-defaults.conf文件): 在本地模式下,当你运行spark-submit时,已经使用默认的内存设置启动了JVM,因此在conf中设置“spark.driver.memory”实际上并不会为你做任何事情。 相反,您需要运行spark-submit,如下所示: bin/spark-submit –driver-memory 2g –class your.class.here app.jar (来自如何设置Apache Spark Executor内存 )。 我以为我可以通过在sparklyr.shell.driver-memory添加sparklyr.shell.driver-memory选项来复制上面的bin/spark-submit命令; 如Sparklyr文档中所述; sparklyr.shell* options是传递给spark-submit命令行参数,即添加sparklyr.shell.driver-memory: 5G到config.yml文件应该相当于运行bin/spark-submit –driver-memory 5G 。 我现在已经尝试了上述所有选项,但它们都没有更改Spark应用程序中的驱动程序内存(我通过查看Spark UI的’Executors’选项卡进行检查)。 那么如何通过Sparklyr在本地模式下运行Spark时更改驱动程序内存?

如何使用apache spark的MLlib的线性回归?

我是apache spark的新手,从MLlib的文档中,我发现了一个scala的例子,但我真的不知道scala,有人知道java中的一个例子吗? 谢谢! 示例代码是 import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint // Load and parse the data val data = sc.textFile(“mllib/data/ridge-data/lpsa.data”) val parsedData = data.map { line => val parts = line.split(‘,’) LabeledPoint(parts(0).toDouble, parts(1).split(‘ ‘).map(x => x.toDouble).toArray) } // Building the model val numIterations = 20 val model = LinearRegressionWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and […]

在Java中使用foreachActive for spark Vector

如何在Java中编写简单代码,迭代稀疏向量中的活动元素? 让我们说我们有这样的矢量: Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}); 我尝试使用lambda或Function2(来自三个不同的导入但总是失败)。 如果您使用Function2,请提供必要的导入。

如果在SparkAction中使用PySpark,Oozie作业将无法运行

我在Oozie中遇到过几个SparkAction作业的例子,其中大多数都是用Java编写的。 我编辑了一下并在Cloudera CDH Quickstart 5.4.0(使用Spark版本1.4.0)中运行该示例。 workflow.xml ${jobTracker} ${nameNode} ${master} ${mode} Spark-FileCopy org.apache.oozie.example.SparkFileCopy ${nameNode}/user/${wf:user()}/${examplesRoot}/apps/spark/lib/oozie-examples.jar ${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt ${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] job.properties nameNode=hdfs://quickstart.cloudera:8020 jobTracker=quickstart.cloudera:8032 master=local[2] mode=client examplesRoot=examples oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/spark Oozie工作流示例(在Java中)能够完成并完成其任务。 我用Python / PySpark编写了一个spark-submit作业。 我尝试删除和jar my_pyspark_job.py 但是当我尝试运行Oozie-Spark作业时,我在日志中出错: Launcher ERROR, reason: Main class [org.apache.oozie.action.hadoop.SparkMain], exit code [2] 我想知道如果我使用Python / PySpark,我应该在和标签中放置什么?

apache zeppelin抛出NullPointerException错误

我是zeppelin的新手并试图在我的系统上设置zeppelin。 直到现在我已经完成了以下步骤: 从这里下载齐柏林飞艇 在我的系统环境变量中设置JAVA_HOME。 转到zeppelin-0.7.3-bin-all \ bin并运行zeppelin.cmd 能够在http:// localhost:8090上看到zeppelin-ui 当我试图将load data into table运行load data into table zeppelin tutotial – > Basic Features(spark)中提到的load data into table程序时,它会抛出以下错误 java.lang.NullPointerException at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38) at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:33) at org.apache.zeppelin.spark.SparkInterpreter.createSparkContext_2(SparkInterpreter.java:398) at org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:387) at org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:146) at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:843) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:491) at org.apache.zeppelin.scheduler.Job.run(Job.java:175) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at […]

如何下载dse.jar

我正在尝试使用DataStax Enterprise 4.6在Java中编写Spark应用程序,并在DSE的Spark分析模式下运行它。 使用DSEConfHelper创建Spark上下文的代码: SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf()) .setAppName( “My application”); 要使用DSEConfHelper我们需要导入位于dse.jar中的dse.jar 。 在我的pom.xml我包含了依赖项: com.datastax bdp 4.6.0 但是Maven无法下载dse.jar 。 请帮帮我。 用于创建Spark上下文的代码的参考来自: http : //www.datastax.com/documentation/datastax_enterprise/4.6/datastax_enterprise/spark/sparkJavaApi.html

如何在Java Spark RDD上执行标准偏差和平均操作?

我有一个看起来像这样的JavaRDD。 [ [A,8] [B,3] [C,5] [A,2] [B,8] … … ] 我希望我的结果是卑鄙的 [ [A,5] [B,5.5] [C,5] ] 如何仅使用Java RDD执行此操作。 PS:我想避免groupBy操作,所以我没有使用DataFrames。