任务不可序列化 – Spark Java
我在Spark中得到Task不可序列化的错误。 我已经搜索并尝试使用某些post中建议的静态函数,但它仍然会出现相同的错误。
代码如下:
public class Rating implements Serializable { private SparkSession spark; private SparkConf sparkConf; private JavaSparkContext jsc; private static Function mapFunc; public Rating() { mapFunc = new Function() { public Rating call(String str) { return Rating.parseRating(str); } }; } public void runProcedure() { sparkConf = new SparkConf().setAppName("Filter Example").setMaster("local"); jsc = new JavaSparkContext(sparkConf); SparkSession spark = SparkSession.builder().master("local").appName("Word Count") .config("spark.some.config.option", "some-value").getOrCreate(); JavaRDD ratingsRDD = spark.read().textFile("sample_movielens_ratings.txt") .javaRDD() .map(mapFunc); } public static void main(String[] args) { Rating newRating = new Rating(); newRating.runProcedure(); } }
错误给出:
我该如何解决这个错误? 提前致谢。
显然, Rating
不能是Serializable
,因为它包含对Spark结构(即SparkSession
, SparkConf
等)的引用作为属性。
这里的问题在于
JavaRDD ratingsRD = spark.read().textFile("sample_movielens_ratings.txt") .javaRDD() .map(mapFunc);
如果你看一下mapFunc
的定义,你就会返回一个Rating
对象。
mapFunc = new Function() { public Rating call(String str) { return Rating.parseRating(str); } };
此函数在map
内部使用(Spark术语中的转换 )。 由于转换直接执行到工作节点而不是驱动程序节点,因此它们的代码必须是可序列化的。 这迫使Spark尝试序列化Rating
类,但这是不可能的。
尝试从Rating
提取您需要的function,并将它们放在不具有任何Spark结构的其他类中。 最后,使用这个新类作为mapFunc
函数的返回类型。
此外,您必须确保不在您的类中包含非序列化变量,如JavaSparkContext和SparkSession 。 如果你需要包含它们,你应该这样定义:
private transient JavaSparkContext sparkCtx; private transient SparkSession spark;
祝你好运。
- 线程“main”中的exceptionorg.apache.spark.SparkException:此JVM中只能运行一个SparkContext(参见SPARK-2243)
- 如何在spark数据框中展平结构?
- 如何更新火花流中的广播变量?
- Spark与Cassandra输入/输出
- Java,Spark和Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFuture无法转换为阴影
- 如何在Java中的Apache Spark中将DataFrame转换为Dataset?
- 在Spark Web UI中看不到完成的作业
- 用spark分析日志文件?
- 将JavaPairRDD转换为JavaRDD