Spark序列化和Java序列化有什么区别?

我正在使用Spark + Yarn,我有一个我想在分布式节点上调用的服务。

当我在使用java序列化的Junit测试中“手动”序列化此服务对象时,服务的所有内部集合都被很好地序列化和反序列化:

@Test public void testSerialization() { try ( ConfigurableApplicationContext contextBusiness = new ClassPathXmlApplicationContext("spring-context.xml"); FileOutputStream fileOutputStream = new FileOutputStream("myService.ser"); ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileOutputStream); ) { final MyService service = (MyService) contextBusiness.getBean("myServiceImpl"); objectOutputStream.writeObject(service); objectOutputStream.flush(); } catch (final java.io.IOException e) { logger.error(e.getMessage(), e); } } @Test public void testDeSerialization() throws ClassNotFoundException { try ( FileInputStream fileInputStream = new FileInputStream("myService.ser"); ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream); ) { final MyService myService = (MyService) objectInputStream.readObject(); // HERE a functionnal test who proves the service has been fully serialized and deserialized . } catch (final java.io.IOException e) { logger.error(e.getMessage(), e); } } 

但是当我试图通过我的Spark启动器调用这个服务时,我是否广播了服务对象,一些内部集合(一个HashMap)消失了(不是序列化的)就好像它被标记为“瞬态”(但它不是瞬态的)静态的) :

 JavaRDD listeInputsRDD = sprkCtx.parallelize(listeInputs, 10); JavaRDD listeOutputsRDD = listeInputsRDD.map(new Function() { private static final long serialVersionUID = 1L; public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception MyOutput output = service.evaluate(input); return (new OutputObject(output)); } }); 

如果我广播该服务,结果相同:

 final Broadcast broadcastedService = sprkCtx.broadcast(service); JavaRDD listeInputsRDD = sprkCtx.parallelize(listeInputs, 10); JavaRDD listeOutputsRDD = listeInputsRDD.map(new Function() { private static final long serialVersionUID = 1L; public OutputObject call(InputOjbect input) throws TarificationXmlException { // Exception MyOutput output = broadcastedService.getValue().evaluate(input); return (new OutputObject(output)); } }); 

如果我以本地模式而不是纱线群集模式启动相同的Spark代码,它可以完美地工作。

所以我的问题是:Spark Serialization和Java Serialization之间有什么区别? (我没有使用Kryo或任何自定义序列化)。

编辑:当我尝试使用Kryo序列化程序(没有明确注册任何类)时,我遇到了同样的问题。

好的,感谢我们的一位实验数据分析师,我发现了它。

那么,这个谜是什么?

  • 它不是关于序列化(java或Kryo)
  • 这不是关于一些预处理或后处理Spark会在序列化之前/之后进行的
  • 它不是关于完全可序列化的HashMap字段(如果你阅读我给出的第一个例子,这个很明显,但不适合所有人;)

所以…

整个问题是关于这个:

“如果我在本地模式而不是纱线群集模式下启动相同的Spark代码,它将完美运行。”

在“纱线群集”模式下,无法初始化集合,因为它是在随机节点上启动的,无法访问磁盘上的初始参考数据。 在本地模式下,当初始数据在磁盘上找不到时,有一个明显的例外,但在集群模式下,它是完全静默的,看起来问题是关于序列化。

使用“纱线客户端”模式为我们解决了这个问题。