Tag: apache spark

任务不可序列化 – Spark Java

我在Spark中得到Task不可序列化的错误。 我已经搜索并尝试使用某些post中建议的静态函数,但它仍然会出现相同的错误。 代码如下: public class Rating implements Serializable { private SparkSession spark; private SparkConf sparkConf; private JavaSparkContext jsc; private static Function mapFunc; public Rating() { mapFunc = new Function() { public Rating call(String str) { return Rating.parseRating(str); } }; } public void runProcedure() { sparkConf = new SparkConf().setAppName(“Filter Example”).setMaster(“local”); jsc = new JavaSparkContext(sparkConf); SparkSession spark […]

Spark ML Pipeline api保存不起作用

在版本1.6中,管道api获得了一组新的function来保存和加载管道阶段。 在我训练分类器并稍后再次加载以重新使用它并节省计算再次建模的努力之后,我尝试将一个阶段保存到磁盘。 出于某种原因,当我保存模型时,该目录仅包含元数据目录。 当我尝试再次加载它时,我得到以下exception: 线程“main”中的exceptionjava.lang.UnsupportedOperationException:org.apache.spark.rdd.RDDOperationScope $中org.apache.spark.rdd.RDD $$ anonfun $ first $ 1.apply(RDD.scala:1330)的空集合.withScope(RDDOperationScope.scala:150)atg.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111)atg.apache.spark.rdd.RDD.withScope(RDD.scala:316)at org .apache.spark.rdd.RDD.first(RDD.scala:1327)位于org.apache.spark.ml.tuning的org.apache.spark.ml.util.DefaultParamsReader $ .loadMetadata(ReadWrite.scala:284)。 CrossValidator $ SharedReadWrite $ .load(CrossValidator.scala:287)org.apache.spark.ml.tuning.CrossValidatorModel $ CrossValidatorModelReader.load(CrossValidator.scala:393)at org.apache.spark.ml.tuning.CrossValidatorModel $ CrossValidatorModelReader .load(CrossValidator.scala:384)org.apache.spark.ml.util.MLReadable $ class.load(ReadWrite.scala:176)at org.apache.spark.ml.tuning.CrossValidatorModel $ .load(CrossValidator。 scala:368)在org.apache.spark.ml.tuning.CrossVal org.test.categoryminer.spark.SparkTextClassifierModelCache.get中的idatorModel.load(CrossValidator.scala)(SparkTextClassifierModelCache.java:34) 保存我使用的模型: crossValidatorModel.save(“/tmp/my.model”) 并加载它我使用: CrossValidatorModel.load(“/tmp/my.model”) 我调用了在CrossValidator对象上调用fit(dataframe)时得到的CrossValidatorModel对象的save。 任何指针为什么它只保存元数据目录?

Spark 2.0.1写入错误:引起:java.util.NoSuchElementException

我试图将情绪值附加到每个消息,我已经下载了所有stanford核心jar文件作为依赖项: import sqlContext.implicits._ import com.databricks.spark.corenlp.functions._ import org.apache.spark.sql.functions._ val version = “3.6.0” val model = s”stanford-corenlp-$version-models-english” // val jars = sc.listJars if (!jars.exists(jar => jar.contains(model))) { import scala.sys.process._ s”wget http://repo1.maven.org/maven2/edu/stanford/nlp/stanford- corenlp/$version/$model.jar -O /tmp/$model.jar”.!! sc.addJar(s”/tmp/$model.jar”)} val all_messages = spark.read.parquet(“/home/ubuntu/messDS.parquet”) case class AllMessSent (user_id: Int, sent_at: java.sql.Timestamp, message: String) val messDS = all_messages.as[AllMess] 到目前为止,一切都很好,因为我可以执行计算并保存DS case class AllMessSentiment = […]

如何将JavaPairRDD转换为数据集?

SparkSession.createDataset()只允许List, RDD, or Seq – 但它不支持JavaPairRDD 。 因此,如果我有一个我想要创建Dataset的JavaPairRDD ,那么SparkSession.createDataset()限制的可行工作区SparkSession.createDataset()可以创建包含两个字段的包装器UserMap类: String和User 。 然后执行spark.createDataset(userMap, Encoders.bean(UserMap.class)); ?

无法执行超过火花作业“初始作业未接受任何资源”

使用独立的Spark Java来执行下面的代码片段,我得到状态总是等待时出现以下错误。当我尝试添加Print语句时,它不起作用。 是否有任何配置我可能错过了多个工作? 15/09/18 15:02:56 INFO DAGScheduler:从第0阶段提交2个缺失的任务(MapPartitionsRDD [2]在SparkTest.java:143的filter处) 15/09/18 15:02:56 INFO TaskSchedulerImpl:添加任务集0.0,包含2个任务 15/09/18 15:03:11 WARN TaskSchedulerImpl:初始工作没有接受任何资源; 检查群集UI以确保工作人员已注册并具有足够的资源 15/09/18 15:03:26 WARN TaskSchedulerImpl:初始工作没有接受任何资源; 检查群集UI以确保工作人员已注册并具有足够的资源 15/09/18 15:03:41 WARN TaskSchedulerImpl:初始工作没有接受任何资源; 检查群集UI以确保工作人员已注册并具有足够的资源 JavaRDD words = input.flatMap(new FlatMapFunction() //Ln:143 { public Iterable call(String x) { return Arrays.asList(x.split(” “)); } }); // Count all the words System.out.println(“Total words is” + words.count())

如何为每个记录生成唯一ID

我有一个包含MM +记录的庞大数据集,我正在尝试为每条记录分配唯一的ID。 我试过下面的代码但是由于行id是顺序的,所以需要很多时间。 我试过调整内存参数来优化作业,无法获得太多性能。 示例代码段: JavaRDD rawRdd=…… rawRdd.zipWithIndex() .mapToPair(t->new Tuple2(t._2,t._1)) 有没有更好的方法来分配唯一ID? 谢谢

Java中的“Lambdifying”scala函数

使用Java和Apache Spark(已经在Scala中重写),面对旧的API方法( org.apache.spark.rdd.JdbcRDD构造函数),它有AbstractFunction1作为它的参数: abstract class AbstractFunction1[@scala.specialized -T1, @scala.specialized +R]() extends scala.AnyRef with scala.Function1[T1, R] {} 因为AbstractFunction1是一个抽象类,所以我不能使用Java8 lambdas,所以我决定用scala.Function1 trait包装java.util.functions.Function是相同的但是没有实现andThen和compose方法。 结果,我创建了thes接口: import scala.Function1; @FunctionalInterface public interface Funct extends Function1, Serializable { @Override default Function1 compose(Function1 before) { return null; } @Override default Function1 andThen(Function1 g) { return null; } } IDE对此接口没有任何问题,但在编译时,get: [ERROR] Funct is not a functional […]

使用已安装的spark和maven将Spark Scala Program编译为jar文件

仍然试图熟悉maven并将我的源代码编译成jar文件以进行spark-submit。 我知道如何使用IntelliJ,但想了解这实际上是如何工作的。 我有一个EC2服务器,已经安装了所有最新的软件,如spark和scala,并且有我想用maven编译的示例SparkPi.scala源代码。 我的愚蠢问题首先是,我可以使用我安装的软件来构建代码,而不是从maven存储库中检索依赖项,如何从基本的pom.xml模板开始添加适当的需求。 我不完全理解maven正在做什么,我怎么才能测试我的源代码的编译? 据我了解,我只需要有标准的目录结构src/main/scala然后想运行mvn package 。 此外,我想用maven而不是sbt进行测试。

加入一个dataframespark java

首先,感谢您抽出时间阅读我的问题。 我的问题如下:在Spark with Java中,我在两个dataframe中加载了两个csv文件的数据。 这些数据框将具有以下信息。 Dataframe机场 Id | Name | City ———————– 1 | Barajas | Madrid Dataframe airport_city_state City | state —————- Madrid | España 我想加入这两个dataframe,使它看起来像这样: dataframe结果 Id | Name | City | state ————————– 1 | Barajas | Madrid | España 其中dfairport.city = dfaiport_city_state.city 但是我不能用语法来澄清所以我可以正确地进行连接。 我如何创建变量的一些代码: // Load the csv, you have to […]

BroadCast变量在Spark程序中发布

在spark-java程序中,我需要读取一个配置文件并填充HashMap,我需要将其作为广播变量发布,以便它可以在所有数据节点上使用。 我需要在CustomInputFormat类中获取此广播变量的值,该类将在datanode中运行。 我如何在我的CustomInputFormat类中指定从特定广播变量中获取值,因为广播变量是在我的驱动程序中声明的? 我正在添加一些代码来解释它: 在这个场景1我在驱动程序本身使用它,即变量在同一个类中使用:这里我可以使用Broadcat.value()方法 > final Broadcast signPrefixes = > sc.broadcast(loadCallSignTable()); > JavaPairRDD countryContactCounts = contactCounts.mapToPair( > new PairFunction<Tuple2, String, Integer> (){ > public Tuple2 call(Tuple2 callSignCount) { > String sign = callSignCount._1(); > String country = lookupCountry(sign, signPrefixes.value()); > return new Tuple2(country, callSignCount._2()); > }}).reduceByKey(new SumInts()); 在场景2中,我将在自定义输入格式类中使用广播变量: 司机计划: > final JavaSparkContext sc= new […]