任务不可序列化 – Spark Java

我在Spark中得到Task不可序列化的错误。 我已经搜索并尝试使用某些post中建议的静态函数,但它仍然会出现相同的错误。

代码如下:

public class Rating implements Serializable { private SparkSession spark; private SparkConf sparkConf; private JavaSparkContext jsc; private static Function mapFunc; public Rating() { mapFunc = new Function() { public Rating call(String str) { return Rating.parseRating(str); } }; } public void runProcedure() { sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local"); jsc = new JavaSparkContext(sparkConf); SparkSession spark = SparkSession.builder().master("local").appName("Word Count") .config("spark.some.config.option", "some-value").getOrCreate(); JavaRDD ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt") .javaRDD() .map(mapFunc); } public static void main(String[] args) { Rating newRating = new Rating(); newRating.runProcedure(); } } 

错误给出: 在此处输入图像描述

我该如何解决这个错误? 提前致谢。

显然, Rating不能是Serializable ,因为它包含对Spark结构(即SparkSessionSparkConf等)的引用作为属性。

这里的问题在于

 JavaRDD ratingsRD = spark.read().textFile("sample_movielens_ratings.txt") .javaRDD() .map(mapFunc); 

如果你看一下mapFunc的定义,你就会返回一个Rating对象。

 mapFunc = new Function() { public Rating call(String str) { return Rating.parseRating(str); } }; 

此函数在map内部使用(Spark术语中的转换 )。 由于转换直接执行到工作节点而不是驱动程序节点,因此它们的代码必须是可序列化的。 这迫使Spark尝试序列化Rating类,但这是不可能的。

尝试从Rating提取您需要的function,并将它们放在不具有任何Spark结构的其他类中。 最后,使用这个新类作为mapFunc函数的返回类型。

此外,您必须确保不在您的类中包含非序列化变量,如JavaSparkContextSparkSession 。 如果你需要包含它们,你应该这样定义:

 private transient JavaSparkContext sparkCtx; private transient SparkSession spark; 

祝你好运。