flink – 使用匕首注射 – 不可序列化?
我使用Flink(最新通过git)从kafka流到cassandra。 为了简化unit testing我通过Dagger添加dependency injection。
ObjectGraph似乎正在正确设置自己,但是’内部对象’被Flink标记为“不可序列化”。 如果我直接包含这些对象,它们会起作用 – 那么区别是什么?
有问题的类实现了MapFunction和@Inject一个用于cassandra的模块和一个用于读取配置文件的模块。
有没有办法建立这个,所以我可以使用后期绑定或Flink使这不可能?
编辑:
fwiw – dependency injection(通过匕首)和RichMapFunction不能共存。 Dagger不允许您包含任何在其定义中扩展的对象。
进一步:
通过Dagger Lazy 实例化的对象也不会序列化。
线程“main”中的exceptionorg.apache.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化
…
引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1
在深入讨论问题的具体细节之前,有关Apache Flink中函数可序列化的一些背景知识:
串行化
Apache Flink使用Java Serialization(java.io.Serializable)将函数对象(此处为MapFunction
)发送给并行执行它们的worker。 因此,函数需要是可序列化的:函数可能不包含任何非可序列化的字段,即非原始类型(int,long,double,…)并且不实现java.io.Serializable
。
使用非可序列化构造的典型方法是懒惰地初始化它们。
延迟初始化
在Flink函数中使用非可序列化类型的一种方法是懒惰地初始化它们。 当序列化函数时,保存这些类型的字段仍为null
,并且仅在函数被函数反序列化后设置。
-
在Scala中,您可以简单地使用惰性字段,例如
lazy val x = new NonSerializableType()
。NonSerializableType
类型实际上只在第一次访问变量x
创建,变量x
通常在worker上。 因此,类型可以是不可序列化的,因为当函数序列化为运送给工作者时,x
为null。 -
在Java中,如果将其设置为Rich Function ,则可以在函数的
open()
方法上初始化非可序列化字段。 丰富的函数(如RichMapFunction
)是基本函数的扩展版本(此处为MapFunction
),并允许您访问生命周期方法,如open()
和close()
。
懒惰依赖注射
我不太熟悉dependency injection,但是dagger似乎也提供了类似懒惰的依赖,这可能有助于解决方法,就像Scala中的惰性变量一样:
new MapFunction() { @Inject Lazy dep; public Long map(Long value) { return dep.get().doSomething(value); } }