我应该将变量保留为瞬态变量吗?
我一直在试验Apache Spark试图解决一些查询,如top-k,skyline等。
我创建了一个包装SparkConf
和名为SparkContext
的包装器。 这个类也实现了serializable,但由于SparkConf
和JavaSparkContext
不是可序列化的,所以类也不是。
我有一个类解决名为TopK
的topK查询,该类实现了serializable,但该类还有一个不可序列化的SparkContext
成员变量(由于上述原因)。 因此,每当我尝试从RDD中的.reduce()
函数中执行TopK
方法时,我都会收到exception。
我发现的解决方案是使SparkContext
瞬态化。
我的问题是:我应该将SparkContext
变量保持为瞬态还是我犯了一个大错误?
SparkContext
类:
import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.*; public class SparkContext implements Serializable { private final SparkConf sparConf; // this is not serializable private final JavaSparkContext sparkContext; // this is not either protected SparkContext(String appName, String master) { this.sparConf = new SparkConf(); this.sparConf.setAppName(appName); this.sparConf.setMaster(master); this.sparkContext = new JavaSparkContext(sparConf); } protected JavaRDD textFile(String path) { return sparkContext.textFile(path); } }
TopK
课程:
public class TopK implements QueryCalculator, Serializable { private final transient SparkContext sparkContext; . . . }
抛出Task not serializable
exception的示例。 getBiggestPointByXDimension
甚至不会被输入,因为为了在reduce函数中执行它,包含它的类( TopK
)必须是可序列化的。
private Point findMedianPoint(JavaRDD points) { Point biggestPointByXDimension = points.reduce((a, b) -> getBiggestPointByXDimension(a, b)); . . . } private Point getBiggestPointByXDimension(Point first, Point second) { return first.getX() > second.getX() ? first : second; }
问题: 我应该将SparkContext变量保持为瞬态吗?
是。 没关系。 它只封装了(Java)SparkContext,并且上下文在worker上不可用,因此将其标记为transient
只是告诉Serializer不要序列化该字段。
您也可以拥有自己的SparkContext
包装器,而不是可序列化的,并将其标记为瞬态 – 与上述效果相同。 (顺便说一下,鉴于SparkContext是spark上下文的Scala类名,我选择了另一个名称来避免混乱。)
还有一件事:正如你所指出的,Spark试图序列化完整的封闭类的原因是因为在封闭中使用了类的方法。 避免这样! 使用匿名类或自包含的闭包(最后将转换为匿名类)。
- 使用Java的Spark作业服务器
- 如何在GroupBy操作后从spark DataFrame列中收集字符串列表?
- 使用Spark从Azure Blob读取数据
- 使用sc.textFile以递归方式从子目录中获取文件内容
- Apache Spark需要5到6分钟才能从Cassandra中简单计算1亿行
- sparkContext JavaSparkContext SQLContext SparkSession之间的区别?
- 使用saveAsTextFile的Spark NullPointerException
- Spark运行时错误:spark.metrics.sink.MetricsServlet无法实例化
- Spark spark-submit –jars参数需要逗号列表,如何声明jar的目录?