Tag: apache spark

使用RabbitMQ源的Spark结构化流式传输

我正在尝试编写一个Structured Streaming的自定义接收器,它将使用来自RabbitMQ消息。 Spark 最近发布了 DataSource V2 API,这似乎非常有前景。 由于它抽象了很多细节,我想在简单性和性能方面使用这个API。 但是,由于它很新,因此可用的资源不多。 我需要经验丰富的Spark人员做一些澄清,因为他们会更容易掌握关键点。 开始了: 我的出发点是博客文章系列,第一部分在这里 。 它显示了如何在没有流function的情况下实现数据源。 为了制作流媒体源,我略微改变了它们,因为我需要实现MicroBatchReadSupport而不是(或除了) DataSourceV2 。 为了提高效率,让多个spark执行器同时使用RabbitMQ是明智的,即从同一队列中消耗RabbitMQ 。 如果我没有感到困惑,输入的每个分区-in Spark的术语 – 对应于来自队列的消费者-in RabbitMQ术语。 因此,我们需要为输入流分配多个分区,对吧? 与本系列的第4部分类似,我实现了MicroBatchReader ,如下所示: @Override public List<DataReaderFactory> createDataReaderFactories() { int partition = options.getInt(RMQ.PARTITICN, 5); List<DataReaderFactory> factories = new LinkedList(); for (int i = 0; i < partition; i++) { factories.add(new RMQDataReaderFactory(options)); } […]

是否存在查看相邻元素的RDD转换函数?

有人知道在转换过程中是否有办法查看已排序的RDD中的相邻元素? 我知道我可以收集然后执行下面示例中的操作,但是它有点破坏了分布式系统的目的,我试图利用它分布的事实。 例: RDD of(string name,int val)映射到RDD(string name,int val,int diff) 这样: name | val becomes -> name | val | diff (current – prior) a | 3 a | 3 | 3 b | 6 b | 6 | 3 c | 4 c | 4 | -2 d | 20 d | 20 | […]

强制分区存储在特定执行程序中

我有5个parititions-RDD和5个工人/执行者。 我怎样才能让Spark将每个RDD的分区保存在不同的worker(ip)上? 如果我说Spark可以在一个工作人员上保存几个分区,而在其他工作人员上有0个分区,我是对的吗? 我可以指定分区数,但Spark仍然可以在单个节点上缓存所有内容。 复制不是一种选择,因为RDD是巨大的。 我找到的解决方法 getPreferredLocations RDD的getPreferredLocations方法不提供100%保证该分区将存储在指定节点上。 Spark将在spark.locality.wait期间spark.locality.wait ,但之后Spark将在不同节点上缓存分区。 作为workarround ,您可以为spark.locality.wait设置非常高的值并覆盖getPreferredLocations 。 坏消息 – 你不能用Java做到这一点,你需要编写Scala代码。 至少Scala内部包含Java代码。 即: class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) { val nodeIPs = Array(“192.168.2.140″,”192.168.2.157″,”192.168.2.77”) override def getPreferredLocations(split: Partition): Seq[String] = Seq(nodeIPs(split.index % nodeIPs.length)) } SparkContext的makeRDD SparkContext有makeRDD方法 。 这种方法缺乏文献记载。 据我所知,我可以指定首选位置,而不是设置spark.locality.wait高值。 坏消息 – 首选位置将在第一次shuffle / join / cogroup操作中被丢弃 。 这两种方法都有一个太高spark.locality.wait缺点,如果一些节点不可用,可能会导致您的集群sturve。 PS更多背景 我有多达10,000个sales-XXX.parquet文件,每个文件代表不同地区不同商品的销售情况。 […]

将JavaRDD转换为DataFrame时出现Spark错误:java.util.Arrays $ ArrayList不是数组模式的有效外部类型

我使用的是Spark 2.1.0。 对于以下代码,它读取文本文件并将内容转换为DataFrame,然后输入Word2Vector模型: SparkSession spark = SparkSession.builder().appName(“word2vector”).getOrCreate(); JavaRDD lines = spark.sparkContext().textFile(“input.txt”, 10).toJavaRDD(); JavaRDD<List> lists = lines.map(new Function<String, List>(){ public List call(String line){ List list = Arrays.asList(line.split(” “)); return list; } }); JavaRDD rows = lists.map(new Function<List, Row>() { public Row call(List list) { return RowFactory.create(list); } }); StructType schema = new StructType(new StructField[] { new […]

Spark on yarn jar上传问题

我正在尝试使用spark over yarn运行一个简单的Map / Reduce java程序(CentOS上的Cloudera Hadoop 5.2)。 我试过这2种不同的方式。 第一种方式如下: YARN_CONF_DIR=/usr/lib/hadoop-yarn/etc/hadoop/; /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit –class MRContainer –master yarn-cluster –jars /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar simplemr.jar 此方法给出以下错误: 诊断:应用程序application_1434177111261_0007失败2次,因为AM容器的appattempt_1434177111261_0007_000002退出,退出时使用exitCode:-1000,原因是:资源hdfs:// kc1ltcld29:9000 / user / myuser / .sparkStaging / application_1434177111261_0007 / spark-assembly-1.4.0-hadoop2。在src文件系统上更改了4.0.jar(预计1434549639128,是1434549642191 然后我试着没有–jars: YARN_CONF_DIR=/usr/lib/hadoop-yarn/etc/hadoop/; /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit –class MRContainer –master yarn-cluster simplemr.jar 诊断:应用程序application_1434177111261_0008由于AM容器导致appattempt_1434177111261_0008_000002退出,失败了2次,因为:文件不存在:hdfs:// kc1ltcld29:9000 / user / myuser / .sparkStaging / application_1434177111261_0008 / spark-assembly-1.4。 0-hadoop2.4.0.jar。试图这个尝试..申请失败。 ApplicationMaster主机:N / […]

在Javardd排序

我用java来激发你的兴趣。 我想对我的地图进行排序。 事实上,我有这样的javaRDD: JavaPairRDD rebondCountURL = session_rebond_2.mapToPair(new PairFunction<Tuple2, String, String>() { @Override public Tuple2 call(Tuple2 stringStringTuple2) throws Exception { return new Tuple2(stringStringTuple2._2, stringStringTuple2._1); } }).groupByKey().map(new PairFunction<Tuple2<String, Iterable>, Tuple2>() { @Override public Tuple2 call(Tuple2<String, Iterable> stringIterableTuple2) throws Exception { Iterable strings = stringIterableTuple2._2; List b = new ArrayList(); for (String s : strings) { b.add(s); } […]

Spark流式传输DStream RDD以获取文件名

Spark流textFileStream和fileStream可以监视目录并处理Dstream RDD中的新文件。 如何在特定时间间隔内获取DStream RDD正在处理的文件名?

为什么Apache Spark在客户端上执行filter

作为新手上的apache引发了一些关于在Spark上获取Cassandra数据的问题。 List dates = Arrays.asList(“2015-01-21″,”2015-01-22”); CassandraJavaRDD aRDD = CassandraJavaUtil.javaFunctions(sc). cassandraTable(“testing”, “cf_text”,CassandraJavaUtil.mapRowTo(A.class, colMap)). where(“Id=? and date IN ?”,”Open”,dates); 此查询不过滤cassandra服务器上的数据。 虽然这个java语句正在执行它的内存并最终抛出spark java.lang.OutOfMemoryErrorexception。 查询应该过滤掉cassandra服务器而不是客户端上的数据,如https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md所述 。 虽然我正在使用cassandra cqlsh上的filter执行查询,但它执行正常但执行查询而没有filter(where子句)正在给出预期的超时。 因此很明显,火花并没有在客户端应用filter。 SparkConf conf = new SparkConf(); conf.setAppName(“Test”); conf.setMaster(“local[8]”); conf.set(“spark.cassandra.connection.host”, “192.168.1.15”) 为什么在客户端应用filter以及如何改进它以在服务器端应用filter。 我们如何在Windows平台上的cassandra集群上配置spark集群?

线程“main”中的exceptionorg.apache.spark.SparkException:此JVM中只能运行一个SparkContext(参见SPARK-2243)

当我尝试使用cassandra运行spark应用程序时,我收到错误。 Exception in thread “main” org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). 我正在使用spark版本1.2.0,很明显我只在我的应用程序中使用了一个spark上下文。 但每当我尝试为流媒体目的添加以下代码时,我都会收到此错误。 JavaStreamingContext activitySummaryScheduler = new JavaStreamingContext( sparkConf, new Duration(1000));

在Apache spark中,使用mapPartitions和组合使用广播变量和map之间的区别是什么

在Spark中,我们使用广播变量使每台机器只读变量的副本。 我们通常在闭包之外创建一个广播变量(例如闭包所需的查找表)以提高性能。 我们还有一个名为mapPartitions的spark转换运算符,它试图实现相同的function(使用共享变量来提高性能)。 例如,在mapPartitions中,我们可以为每个分区共享数据库连接。 那么这两者有什么区别? 我们可以互换地使用它来共享变量吗?