Spark – 任务不可序列化:如何使用调用外部类/对象的复杂映射闭包?

看看这个问题: Scala + Spark – 任务不可序列化:java.io.NotSerializableExceptionon。 当只在类而不是对象上调用闭包外的函数时 。

问题:

假设我的映射器可以是函数(def),它在内部调用其他类并创建对象并在其中执行不同的操作。 (或者它们甚至可以是扩展(Foo)=> Bar的类并在其apply方法中进行处理 – 但是现在让我们忽略这种情况)

Spark仅支持用于闭包的Java Serialization。 有没有办法解决这个问题? 我们可以用东西而不是封闭来做我想做的事吗? 我们可以使用Hadoop轻松完成这类工作。 这一点让Spark几乎无法使用。 人们不能指望所有第三方库都将所有类扩展为Serializable!

可能的解决方案:

这样的事情似乎有用吗: https : //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

它看起来似乎是一个包装器的答案,但我不知道究竟是怎么回事。

我想出了自己如何做到这一点!

您只需要在通过闭包之前序列化对象,然后反序列化。 即使您的类不是Serializable,这种方法也可以正常工作,因为它在幕后使用Kryo。 你需要的只是一些咖喱。 ;)

这是我如何做到的一个例子:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } } 

随意使Blah像你想要的那样复杂,类,伴随对象,嵌套类,对多个第三方库的引用。

KryoSerializationWrapper参考: https : //github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

如果使用Java API,则在传递给映射函数闭包时应避免使用匿名类。 你不需要做map(新函数),而是需要一个扩展你的函数的类并将它传递给map(..)参见: https : //yanago.wordpress.com/2015/03/21/apache-spark/