任务不能在Spark上序列化

我有一个转变:

JavaRDD<Tuple2> mappedRdd = myRDD.values().map( new Function<Pageview, Tuple2>() { @Override public Tuple2 call(Pageview pageview) throws Exception { String key = pageview.getUrl().toString(); Long value = getDay(pageview.getTimestamp()); return new Tuple2(key, value); } }); 

Pageview是一种类型: Pageview.java

我将该类注册到Spark中,如下所示:

 Class[] c = new Class[1]; c[0] = Pageview.class; sparkConf.registerKryoClasses(c); 

线程“main”中的exceptionorg.apache.spark.SparkException:在org.apache.spark.util.ClosureCleaner $ .clean的org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166)中无法序列化的任务(ClosureCleaner.scala:158)org.apache.spark.SparkContext.clean(SparkContext.scala:1623)位于org.apache.spark的org.apache.spark.rdd.RDD.map(RDD.scala:286)。 api.java.JavaRDDLike $ class.map(JavaRDDLike.scala:89)位于org.apache.gora.tutorial.log.ExampleSpark的org.apache.spark.api.java.AbstractJavaRDDLike.map(JDJRDDLike.scala:46)。运行(ExampleSpark.java:100)at org.apache.gora.tutorial.log.ExampleSpark.main(ExampleSpark.java:53)引起:java.io.NotSerializableException:org.apache.gora.tutorial.log.ExampleSpark序列化stack: – 对象不可序列化(类:org.apache.gora.tutorial.log.ExampleSpark,值:org.apache.gora.tutorial.log.ExampleSpark@1a2b4497) – field(class:org.apache.gora.tutorial。 log.ExampleSpark $ 1,name:this $ 0,type:class org.apache.gora.tutorial。 log.ExampleSpark) – object(类org.apache.gora.tutorial.log.ExampleSpark $ 1,org.apache.gora.tutorial.log.ExampleSpark$1@4ab2775d) – field(class:org.apache.spark.api.java .JavaPairRDD $$ anonfun $ toScalaFunction $ 1,name:fun $ 1,type:interface org.apache.spark.api.java.function.Function) – object(class org.apache.spark.api.java.JavaPairRDD $$ anonfun $ to orCcalaFunction $ 1,)org.apache上的org.apache.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:38)org.apache.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)at org.apache.spark。 serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:164)… 7更多

当我调试代码时,我看到JavaSerializer.scala被调用,即使有一个名为KryoSerializer的类。

PS 1:我不想使用Java Serializer,但在Pageview实现Serializer并不能解决问题。

PS 2:这不会丢掉问题:

 ... //String key = pageview.getUrl().toString(); //Long value = getDay(pageview.getTimestamp()); String key = "Dummy"; Long value = 1L; return new Tuple2(key, value); ... 

我用Java代码多次遇到这个问题。 虽然我使用的是Java序列化,但我会创建包含Serializable代码的类,或者如果你不想这样做,我会将Function作为类的静态成员。

这是解决方案的代码片段。

 public class Test { private static Function s = new Function>() { @Override public Tuple2 call(Pageview pageview) throws Exception { String key = pageview.getUrl().toString(); Long value = getDay(pageview.getTimestamp()); return new Tuple2<>(key, value); } }; }