Spark序列化和Java序列化有什么区别?
我正在使用Spark + Yarn,我有一个我想在分布式节点上调用的服务。
当我在使用java序列化的Junit测试中“手动”序列化此服务对象时,服务的所有内部集合都被很好地序列化和反序列化:
@Test public void testSerialization() { try ( ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml"); FileOutputStream fileOutputStream = new FileOutputStream("myService.ser"); ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream); ) { final MyService service = (MyService) contextBusiness.getBean("myServiceImpl"); objectOutputStream.writeObject(service); objectOutputStream.flush(); } catch (final java.io.IOException e) { logger.error(e.getMessage(), e); } } @Test public void testDeSerialization() throws ClassNotFoundException { try ( FileInputStream fileInputStream = new FileInputStream("myService.ser"); ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream); ) { final MyService myService = (MyService) objectInputStream.readObject(); // HERE a functionnal test who proves the service has been fully serialized and deserialized . } catch (final java.io.IOException e) { logger.error(e.getMessage(), e); } }
但是当我试图通过我的Spark启动器调用这个服务时,我是否广播了服务对象,一些内部集合(一个HashMap)消失了(不是序列化的)就好像它被标记为“瞬态”(但它不是瞬态的)静态的) :
JavaRDD listeInputsRDD = sprkCtx.parallelize(listeInputs, 10); JavaRDD listeOutputsRDD = listeInputsRDD.map(new Function() { private static final long serialVersionUID = 1L; public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception MyOutput output = service.evaluate(input); return (new OutputObject(output)); } });
如果我广播该服务,结果相同:
final Broadcast broadcastedService = sprkCtx.broadcast(service); JavaRDD listeInputsRDD = sprkCtx.parallelize(listeInputs, 10); JavaRDD listeOutputsRDD = listeInputsRDD.map(new Function() { private static final long serialVersionUID = 1L; public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception MyOutput output = broadcastedService.getValue().evaluate(input); return (new OutputObject(output)); } });
如果我以本地模式而不是纱线群集模式启动相同的Spark代码,它可以完美地工作。
所以我的问题是:Spark Serialization和Java Serialization之间有什么区别? (我没有使用Kryo或任何自定义序列化)。
编辑:当我尝试使用Kryo序列化程序(没有明确注册任何类)时,我遇到了同样的问题。
好的,感谢我们的一位实验数据分析师,我发现了它。
那么,这个谜是什么?
- 它不是关于序列化(java或Kryo)
- 这不是关于一些预处理或后处理Spark会在序列化之前/之后进行的
- 它不是关于完全可序列化的HashMap字段(如果你阅读我给出的第一个例子,这个很明显,但不适合所有人;)
所以…
整个问题是关于这个:
“如果我在本地模式而不是纱线群集模式下启动相同的Spark代码,它将完美运行。”
在“纱线群集”模式下,无法初始化集合,因为它是在随机节点上启动的,无法访问磁盘上的初始参考数据。 在本地模式下,当初始数据在磁盘上找不到时,有一个明显的例外,但在集群模式下,它是完全静默的,看起来问题是关于序列化。
使用“纱线客户端”模式为我们解决了这个问题。
- TaskSchedulerImpl:初始作业未接受任何资源;
- 使用mapPartition和迭代器保存spark RDD
- 当从Java应用程序连接到Spark Standalone时,为什么抛出“无法调用已停止的SparkContext上的方法”?
- 使用–jars的spark-submit yarn-cluster不起作用?
- Spark spark-submit –jars参数需要逗号列表,如何声明jar的目录?
- 线程主java.lang.exceptionininitializerError中的exception当没有hadoop安装spark时
- SparkSQL并在Java中的DataFrame上爆炸
- 在Apache Spark中,我可以轻松地重复/嵌套SparkContext.parallelize吗?
- Spark on yarn jar上传问题