运行apache spark job时,任务不可序列化exception
编写以下java程序来试验apache spark。
程序尝试从相应的文件中读取正面和负面单词列表,将其与主文件进行比较并相应地过滤结果。
import java.io.Serializable; import java.io.FileNotFoundException; import java.io.File; import java.util.*; import java.util.Iterator; import java.util.List; import java.util.List; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.Function; public class SimpleApp implements Serializable{ public static void main(String[] args) { String logFile = "/tmp/master.txt"; // Should be some file on your system String positive = "/tmp/positive.txt"; // Should be some file on your system String negative = "/tmp/negative.txt"; // Should be some file on your system JavaSparkContext sc = new JavaSparkContext("local[4]", "Twitter Analyzer", "/home/welcome/Downloads/spark-1.1.0/", new String[]{"target/scala-2.10/Simple-assembly-0.1.0.jar"}); JavaRDD positiveComments = sc.textFile(logFile).cache(); List positiveList = GetSentiments(positive); List negativeList= GetSentiments(negative); final Iterator iterator = positiveList.iterator(); int i = 0; while (iterator.hasNext()) { JavaRDD numAs = positiveComments.filter(new Function() { public Boolean call(String s) { return s.contains(iterator.next()); } }); numAs.saveAsTextFile("/tmp/output/"+ i); i++; } } public static List GetSentiments(String fileName) { List input = new ArrayList(); try { Scanner sc = new Scanner(new File(fileName)); while (sc.hasNextLine()) { input.add(sc.nextLine()); } } catch (FileNotFoundException e){ // do stuff here.. } return input; } }
执行spark job时抛出以下错误,
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.filter(RDD.scala:282) at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) at SimpleApp.main(SimpleApp.java:37) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 12 more
任何指针?
当您创建匿名类时,编译器会执行以下操作:
JavaRDD numAs = positiveComments.filter(new Function() { public Boolean call(String s) { return s.contains(iterator.next()); } });
它将被重写为:
JavaRDD numAs = positiveComments.filter(new Function() { private Iterator<...> $iterator; public Boolean call(String s) { return s.contains($iterator.next()); } });
这就是为什么你可以有一个NotSerializableException
因为Iterator是不可序列化的。
为避免这种情况,只需提取下一个结果:
String value = iterator.next(); JavaRDD numAs = positiveComments.filter(new Function() { public Boolean call(String s) { return s.contains(value); } });
一些Java事实
- 外部类中定义的任何匿名类都引用外部类。
- 如果需要序列化匿名类,它将强制您将外部类序列化。
- 在lambda函数内部,如果使用封闭类的方法,则需要序列化类,如果正在序列化lambda函数。
关于Spark的一些事实。
- 在Same Executor上,多个任务可以在同一个JVM中同时运行,因为任务在spark中生成为线程。
- 任何与spark转换函数(map,mapPartitions,keyBy,redudeByKey …)一起使用的lambda,Anonymous Class都将在驱动程序上实例化,序列化并发送给执行程序。
- 序列化对象意味着将其状态转换为字节流,以便可以将字节流还原为对象的副本。
- 如果Java对象的类或其任何超类实现java.io.Serializable接口或其子接口java.io.Externalizable,并且其所有非瞬态非静态字段都是可序列化的,则Java对象是可序列化的。
避免序列化问题的经验法则:
- 避免使用匿名类,而是使用静态类作为匿名类将强制您将外部类序列化。
- 避免使用静态变量来解决序列化问题,因为Multiple Task可以在同一个JVM中运行,而静态实例可能不是线程安全的。
- 使用瞬态变量来避免序列化问题,您必须在函数调用内而不是构造函数中初始化它们。 在驱动程序上将调用构造函数,在Executor上它将反序列化并为对象。 初始化的唯一方法是在函数调用中。
- 使用Static类代替匿名类。
- 宗教上只为仅需要序列化的类遵循“附加实现Serializable”
- 在“lambda函数”中,永远不会直接引用outclass方法,因为这会导致外部类的序列化。
- 如果需要在Lambda函数中直接使用方法,请将方法设为静态,否则直接使用Class :: func()notion而不是func()
- Java Map <>不实现Serializable,但HashMap实现。
- 在决定使用Braodcast和Raw DataStructures时要明智。 如果您看到真正的好处,那么只使用广播。
如需深入了解,请访问http://bytepadding.com/big-data/spark/understanding-spark-serialization/