Apache Spark中的数据集
Dataset ds = sc.read().json("path").as(Encoders.bean(Tweet.class)); ds.show(); JavaRDD dstry = ds.toJavaRDD(); System.out.println(dstry.first().getClass());
Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)" at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:197) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1325) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1322) at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:90) at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:89) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)" at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1435) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
当我仔细观察时,我唯一提出疑问的是:
找不到适用于实际参数的构造函数/方法“org.apache.spark.unsafe.types.UTF8String”; 候选人是:“public void sparkSQL.Tweet.setId(long)”
正如@ user9718686所写,id字段有不同的类型:json文件中的String
,类定义中的long
。 当您将其读入Dataset
,Spark会从文件中推断出架构并检测到该ID是String
类型,这就是为什么它在您尝试打印时有效(正如您在其中一条评论中提到的那样) )。 如果您希望将dataframe设置为Dataset
,则必须将json文件更改为使用long
ID而不是String
或者当您尝试对dataframe执行任何操作操作时,可以让Spark转换此id。
Dataset rowDataset = sc.read().json("path"); Dataset tweetDataset = rowDataset .withColumn("id", rowDataset.col("id").cast(DataTypes.LongType)) .as(Encoders.bean(Tweet.class)); tweetDataset.printSchema(); System.out.println(tweetDataset.head().getId());
由于类型不匹配,它会给您一个错误:
-
Tweet
类将id
字段定义为Long
。 - 您的数据的
id
为String
。
您必须转换输入或调整类定义。
- SparkSQL并在Java中的DataFrame上爆炸
- Apache Spark需要5到6分钟才能从Cassandra中简单计算1亿行
- Spark Local Mode – 所有作业仅使用一个CPU核心
- 无法找到Web UI的资源路径:org / apache / spark / ui / static创建Spark应用程序时
- 如何在Java Spark RDD上执行标准偏差和平均操作?
- 如何在Spark中将JavaPairInputDStream转换为DataSet / DataFrame
- Spark的Column.isin函数不带List
- 使用Apache Spark从Amazon S3解析文件
- Spark序列化和Java序列化有什么区别?