Tag: spark streaming

Spark流mapWithState超时延迟了吗?

我期望Spark 1.6+的新mapWithState API几乎立即删除超时的对象,但是有一个延迟。 我正在使用下面的JavaStatefulNetworkWordCount的改编版本测试API: SparkConf sparkConf = new SparkConf() .setAppName(“JavaStatefulNetworkWordCount”) .setMaster(“local[*]”); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); ssc.checkpoint(“./tmp”); StateSpec<String, Integer, Integer, Tuple2> mappingFunc = StateSpec.function((word, one, state) -> { if (state.isTimingOut()) { System.out.println(“Timing out the word: ” + word); return new Tuple2(word, state.get()); } else { int sum = one.or(0) + (state.exists() ? state.get() : […]

如何使Spark Streaming计算unit testing中文件中的单词?

我已经在Java中成功构建了一个非常简单的Spark Streaming应用程序,该应用程序基于Scala中的HdfsCount示例 。 当我将此应用程序提交给我的本地Spark时,它会等待将文件写入给定目录,当我创建该文件时,它会成功打印出单词数。 我按Ctrl + C终止应用程序。 现在我已经尝试为这个function创建一个非常基本的unit testing,但是在测试中我无法打印相同的信息,即单词的数量。 我错过了什么? 下面是unit testing文件,之后我还包含了显示countWords方法的代码片段: StarterAppTest.java import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext(“local”, “test”, new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void […]

如何在Spark中将JavaPairInputDStream转换为DataSet / DataFrame

我正在尝试从kafka接收流数据。 在此过程中,我能够接收流数据并将其存储到JavaPairInputDStream中 。 现在我需要分析这些数据,而不是将其存储到任何数据库中。所以我想将此JavaPairInputDStream转换为DataSet或DataFrame 到目前为止我尝试的是: import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; //Streaming Working […]

如何在spark-submit命令中指定要使用的java版本?

我想在远程服务器上的纱线群集上运行火花流应用程序。 默认的java版本是1.7,但我想为我的应用程序使用1.8,它也在服务器中,但不是默认值。 有没有办法通过spark-submit指定java 1.8的位置,这样我就不会遇到major.minor错误?

如何在不使用collect函数的情况下有效地将rdd转换为list

我们知道如果我们需要将RDD转换为列表,那么我们应该使用collect()。 但是这个函数给驱动程序带来了很大的压力(因为它将来自不同执行程序的所有数据带到驱动程序中),这会导致性能下降或更糟(整个应用程序可能会失败)。 有没有其他方法可以将RDD转换为任何java util集合而不使用不会导致性能下降的collect()或collectAsMap()等? 基本上在我们处理批量或流数据处理中的大量数据的当前场景中,诸如collect()和collectAsMap()之类的API在具有实际数据量的真实项目中变得完全无用。 我们可以在演示代码中使用它,但这些都可用于这些API。 那么为什么要有一个我们甚至无法使用的API(或者我错过了什么)。 有没有更好的方法通过其他方法实现相同的结果,或者我们可以更有效地实现collect()和collectAsMap()其他只是调用 List myList= RDD.collect.toList (影响性能) 我抬头看谷歌但找不到任何有效的东西。 如果有人有更好的方法,请帮忙。

Spark Java中的移动平均线

我有实时流数据进入火花,我想对该时间序列数据进行移动平均预测。 有没有办法在Java中使用spark实现它? 我已经提到过: https : //gist.github.com/samklr/27411098f04fc46dcd05/revisions和Apache Spark Moving Average,但这些代码都是用Scala编写的。 由于我不熟悉Scala,我无法判断我是否会发现它有用甚至将代码转换为Java。 在Spark Java中是否有直接的预测实现?

无法解析主URL:’spark:http:// localhost:18080′

当我试图运行我的代码时,它抛出此Exception : Exception in thread “main” org.apache.spark.SparkException: Could not parse Master URL:spark:http://localhost:18080 这是我的代码: SparkConf conf = new SparkConf().setAppName(“App_Name”).setMaster(“spark:http://localhost:18080”).set(“spark.ui.port”,”18080″); JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000)); String[] filet=new String[]{“Obama”,”ISI”}; JavaReceiverInputDStream reciverStream=TwitterUtils.createStream(ssc,filet); JavaDStream statuses = reciverStream.map(new Function() { public String call(Status status) { return status.getText(); } } ); ssc.start(); ssc.awaitTermination();}} 知道如何解决这个问题?

线程“main”中的exceptionorg.apache.spark.SparkException:此JVM中只能运行一个SparkContext(参见SPARK-2243)

当我尝试使用cassandra运行spark应用程序时,我收到错误。 Exception in thread “main” org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). 我正在使用spark版本1.2.0,很明显我只在我的应用程序中使用了一个spark上下文。 但每当我尝试为流媒体目的添加以下代码时,我都会收到此错误。 JavaStreamingContext activitySummaryScheduler = new JavaStreamingContext( sparkConf, new Duration(1000));

如何从spark设置和获取静态变量?

我有一个class级: public class Test { private static String name; public static String getName() { return name; } public static void setName(String name) { Test.name = name; } public static void print() { System.out.println(name); } } 在我的Spark驱动程序中,我正在设置这样的名称并调用print()命令: public final class TestDriver{ public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(“TestApp”); // … […]

使用Spark从Azure Blob读取数据

我在通过火花流从azure色斑点中读取数据时遇到了问题 JavaDStream lines = ssc.textFileStream(“hdfs://ip:8020/directory”); 上面的代码适用于HDFS,但无法从Azure blob读取文件 https://blobstorage.blob.core.windows.net/containerid/folder1/ 以上是azure UI中显示的路径,但这不起作用,我错过了什么,我们如何访问它。 我知道Eventhub是流数据的理想选择,但我目前的情况要求使用存储而不是队列