java.lang.ClassCastException使用远程服务器上的spark作业中的lambda表达式

我正在尝试使用sparkjava.com框架为我的apache spark作业构建一个web api。 我的代码是:

@Override public void init() { get("/hello", (req, res) -> { String sourcePath = "hdfs://spark:54310/input/*"; SparkConf conf = new SparkConf().setAppName("LineCount"); conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" }); File configFile = new File("config.properties"); String sparkURI = "spark://hamrah:7077"; conf.setMaster(sparkURI); conf.set("spark.driver.allowMultipleContexts", "true"); JavaSparkContext sc = new JavaSparkContext(conf); @SuppressWarnings("resource") JavaRDD log = sc.textFile(sourcePath); JavaRDD lines = log.filter(x -> { return true; }); return lines.count(); }); } 

如果我删除lambda表达式或将其放在一个简单的jar而不是web服务(不知何故是一个servlet)中,它将运行而没有任何错误。 但是在servlet中使用lambda表达式会导致此exception:

 15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

PS:我尝试过jerseypark与jetty,tomcat和resin的组合,所有这些都让我得到了同样的结果。

你在这里有一个后续错误掩盖原始错误。

当lambda实例被序列化时,它们使用writeReplace从持久forms(即SerializedLambda实例)中解散其JRE特定实现。 恢复SerializedLambda实例后,将调用其readResolve方法以重新构建相应的lambda实例。 正如文档所说,它将通过调用定义原始lambda的类的特殊方法来实现(请参阅此答案 )。 重要的一点是,需要原始类,这就是你的案例中缺少的。

但是有一个…特殊的…… ObjectInputStream行为。 遇到exception时,它不会立即纾困。 它将记录exception并继续进程,标记当前正在读取的所有对象,因此依赖于错误对象也是错误的。 只有在进程结束时才会抛出它遇到的原始exception。 令它如此奇怪的是,它还将继续尝试设置这些对象的字段。 但是当你看到方法ObjectInputStream.readOrdinaryObject 1806时:

 … if (obj != null && handles.lookupException(passHandle) == null && desc.hasReadResolveMethod()) { Object rep = desc.invokeReadResolve(obj); if (unshared && rep.getClass().isArray()) { rep = cloneArray(rep); } if (rep != obj) { handles.setObject(passHandle, obj = rep); } } return obj; } 

您会看到,当lookupException报告非nullexception时,它不会调用readResolve方法。 但是当替换没有发生时,继续尝试设置引用者的字段值并不是一个好主意,但这正是在这里发生的事情,因此产生了一个ClassCastException

您可以轻松地重现问题:

 public class Holder implements Serializable { Runnable r; } public class Defining { public static Holder get() { final Holder holder = new Holder(); holder.r=(Runnable&Serializable)()->{}; return holder; } } public class Writing { static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser"); public static void main(String... arg) throws IOException { try(FileOutputStream os=new FileOutputStream(f); ObjectOutputStream oos=new ObjectOutputStream(os)) { oos.writeObject(Defining.get()); } System.out.println("written to "+f); } } public class Reading { static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser"); public static void main(String... arg) throws IOException, ClassNotFoundException { try(FileInputStream is=new FileInputStream(f); ObjectInputStream ois=new ObjectInputStream(is)) { Holder h=(Holder)ois.readObject(); System.out.println(hr); hrrun(); } System.out.println("read from "+f); } } 

编译这四个类并运行Writing 。 然后删除类文件Defining.class并运行Reading 。 然后你会得到一个

 Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) 

(经过1.8.0_20测试)


最重要的是,一旦了解了正在发生的事情,您可能会忘记这个序列化问题,解决问题所需要做的就是确保定义lambda表达式的类在lambda所在的运行时中也可用。反序列化。

Spark Job直接从IDE运行的示例(默认情况下spark-submit分配jar):

 SparkConf sconf = new SparkConf() .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory") .set("spark.eventLog.enabled", "true") .setJars(new String[]{"/path/to/jar/with/your/class.jar"}) .setMaster("spark://spark.standalone.uri:7077"); 

我想你的问题是自动装箱失败了。 在代码中

 x -> { return true; } 

你传递( String->boolean )lambda(它是Predicate ),而filter方法需要( String->Boolean )lambda(它是Function )。 所以我建议你改变代码

 x -> { return Boolean.TRUE; } 

请在您的问题中加入详细信息。 uname -ajava -version输出java -version赞赏。 尽可能提供sscce 。

我有同样的错误,我用内部类替换lambda,然后它工作。 我真的不明白为什么,再现这个错误是非常困难的(我们有一台服务器展示了这种行为,而不是其他地方)。

导致序列化问题 (使用lambdas,导致SerializedLambda错误)

 this.variable = () -> { ..... } 

Yields java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

作品

 this.variable = new MyInterface() { public void myMethod() { ..... } }; 

您可以使用spark.scala.Function更简单地重新编写Java8 lambda

更换

 output = rdds.map(x->this.function(x)).collect() 

有:

 output = rdds.map(new Function(){ public Double call(Double x){ return MyClass.this.function(x); } }).collect();