flink – 使用匕首注射 – 不可序列化?

我使用Flink(最新通过git)从kafka流到cassandra。 为了简化unit testing我通过Dagger添加dependency injection。

ObjectGraph似乎正在正确设置自己,但是’内部对象’被Flink标记为“不可序列化”。 如果我直接包含这些对象,它们会起作用 – 那么区别是什么?

有问题的类实现了MapFunction@Inject一个用于cassandra的模块和一个用于读取配置文件的模块。

有没有办法建立这个,所以我可以使用后期绑定或Flink使这不可能?


编辑:

fwiw – dependency injection(通过匕首)和RichMapFunction不能共存。 Dagger不允许您包含任何在其定义中扩展的对象。

进一步:

通过Dagger Lazy 实例化的对象也不会序列化。

线程“main”中的exceptionorg.apache.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化

引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1

在深入讨论问题的具体细节之前,有关Apache Flink中函数可序列化的一些背景知识:

串行化

Apache Flink使用Java Serialization(java.io.Serializable)将函数对象(此处为MapFunction )发送给并行执行它们的worker。 因此,函数需要是可序列化的:函数可能不包含任何非可序列化的字段,即非原始类型(int,long,double,…)并且不实现java.io.Serializable

使用非可序列化构造的典型方法是懒惰地初始化它们。

延迟初始化

在Flink函数中使用非可序列化类型的一种方法是懒惰地初始化它们。 当序列化函数时,保存这些类型的字段仍为null ,并且仅在函数被函数反序列化后设置。

  • 在Scala中,您可以简单地使用惰性字段,例如lazy val x = new NonSerializableType()NonSerializableType类型实际上只在第一次访问变量x创建,变量x通常在worker上。 因此,类型可以是不可序列化的,因为当函数序列化为运送给工作者时, x为null。

  • 在Java中,如果将其设置为Rich Function ,则可以在函数的open()方法上初始化非可序列化字段。 丰富的函数(如RichMapFunction )是基本函数的扩展版本(此处为MapFunction ),并允许您访问生命周期方法,如open()close()

懒惰依赖注射

我不太熟悉dependency injection,但是dagger似乎也提供了类似懒惰的依赖,这可能有助于解决方法,就像Scala中的惰性变量一样:

 new MapFunction() { @Inject Lazy dep; public Long map(Long value) { return dep.get().doSomething(value); } }