RDD不可序列化的Cassandra / Spark连接器java API

所以我之前对如何在java maven项目中使用spark查询cassandra有一些疑问: 在Java Maven项目中通过Spark查询Cassandra中的数据

好吧,我的问题得到了回答并且有效,但是我遇到了一个问题(可能是一个问题)。 我正在尝试使用datastax java API。 这是我的代码:

package com.angel.testspark.test2; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.io.Serializable; import static com.datastax.spark.connector.CassandraJavaUtil.*; public class App { // firstly, we define a bean class public static class Person implements Serializable { private Integer id; private String fname; private String lname; private String role; // Remember to declare no-args constructor public Person() { } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getfname() { return fname; } public void setfname(String fname) { this.fname = fname; } public String getlname() { return lname; } public void setlname(String lname) { this.lname = lname; } public String getrole() { return role; } public void setrole(String role) { this.role = role; } // other methods, constructors, etc. } private transient SparkConf conf; private App(SparkConf conf) { this.conf = conf; } private void run() { JavaSparkContext sc = new JavaSparkContext(conf); createSchema(sc); sc.stop(); } private void createSchema(JavaSparkContext sc) { JavaRDD rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class) .where("role=?", "IT Engineer").map(new Function() { @Override public String call(Person person) throws Exception { return person.toString(); } }); System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray())); } public static void main( String[] args ) { if (args.length != 2) { System.err.println("Syntax: com.datastax.spark.demo.JavaDemo  "); System.exit(1); } SparkConf conf = new SparkConf(); conf.setAppName("Java API demo"); conf.setMaster(args[0]); conf.set("spark.cassandra.connection.host", args[1]); App app = new App(conf); app.run(); } } 

这是我的错误:

 Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

现在我知道我的错误究竟在哪里。 它是System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray())); 因为我需要将rdd转换为数组。 但是,API文档SAID i应该能够执行此操作…这是从文档中复制和粘贴的代码。 为什么我不能将RDD序列化为数组?

我已经使用上面链接中包含的post中的插入信息将伪数据插入到我的cassandra中。

此外,我解决的先前错误是当我将所有getter和setter更改为小写时。 当我在其中使用大写字母时,它会产生错误。 为什么我不能在我的吸气剂和制定者中使用大写字母?

谢谢,天使

public class App更改为public class App implements Serializable应该修复错误。 因为java内部类将保留对外部类的引用,所以Function对象将具有对App的引用。 由于Spark需要序列化您的Function对象,因此它需要App也可序列化。