如何修复java.lang.ClassCastException:无法将scala.collection.immutable.List的实例分配给字段类型scala.collection.Seq?

这个错误一直是最难追踪的。 我不知道发生了什么事。 我在我的位置机器上运行Spark集群。 所以整个火花集群在一个主机127.0.0.1 ,我在一个独立模式下运行

 JavaPairRDD<byte[], Iterable> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" ) .select("rowkey", "col1", "col2", "col3", ) .spanBy(new Function() { @Override public byte[] call(CassandraRow v1) { return v1.getBytes("rowkey").array(); } }, byte[].class); Iterable<Tuple2<byte[], Iterable>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE Tuple2<byte[], Iterable> tuple = listOftuples.iterator().next(); byte[] partitionKey = tuple._1(); for(CassandraRow cassandraRow: tuple._2()) { System.out.println("************START************"); System.out.println(new String(partitionKey)); System.out.println("************END************"); } 

这个错误一直是最难追踪的。 它显然发生在cassandraRowsRDD.collect() ,我不知道为什么?

 16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21) java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

这是我使用的版本

 Scala code runner version 2.11.8 // when I run scala -version or even ./spark-shell compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0' compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0' compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0' compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3': 

我的gradle文件在介绍一些名为“提供”的内容后看起来像这样,实际上似乎并不存在,但谷歌说要创建一个,所以我的build.gradle看起来像这样

 group 'com.company' version '1.0-SNAPSHOT' apply plugin: 'java' apply plugin: 'idea' repositories { mavenCentral() mavenLocal() } configurations { provided } sourceSets { main { compileClasspath += configurations.provided test.compileClasspath += configurations.provided test.runtimeClasspath += configurations.provided } } idea { module { scopes.PROVIDED.plus += [ configurations.provided ] } } dependencies { compile 'org.slf4j:slf4j-log4j12:1.7.12' provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0' provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0' provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0' provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3' } jar { from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } } // with jar from sourceSets.test.output manifest { attributes 'Main-Class': "com.company.batchprocessing.Hello" } exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA' zip64 true } 

我有同样的问题,可以通过将我的应用程序的jar添加到spark的classpath来解决它

 spark = SparkSession.builder() .appName("Foo") .config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar") 

我已经遇到了同样的例外,并且已经深入研究了多个相关的Jiras( 9219,12675,18075 )。

我认为exception名称令人困惑,真正的问题是spark集群和驱动程序应用程序之间的环境设置不一致

例如,我在conf/spark-defaults.conf使用以下行启动了我的Spark集群:

 spark.master spark://master:7077 

当我用一行开始我的驱动程序(甚至程序以spark-submit启动)时:

 sparkSession.master("spark://:7077") 

其中是节点master的正确IP地址,但由于这种简单的不一致,程序将失败。

因此,我建议所有驱动程序应用程序都使用spark-submit启动,并且不要复制驱动程序代码中的任何配置(除非您需要覆盖某些配置)。 也就是说,让spark-submit在运行的Spark集群中以相同的方式设置您的环境。

你应该调用()方法返回byte [],如下所示。

 @Override public byte[] call(CassandraRow v1) { return v1.getBytes("rowkey").array(); } 

如果您仍然遇到问题,请检查Jira中提到的依赖项版本https://issues.apache.org/jira/browse/SPARK-9219

在我的情况下,我不得不添加spark-avro jar(我把它放在主jar旁边的/lib文件夹中):

 SparkSession spark = SparkSession.builder().appName("myapp").getOrCreate(); ... spark.sparkContext().addJar("lib/spark-avro_2.11-4.0.0.jar"); 

检查代码 – 在Intellij中:分析… – >检查代码。 如果您已弃用与序列化相关的方法,请修复它。 或者只是尝试减少Spark或Scala版本。 在我的情况下,我将Scala版本减少到2.10并且一切正常。