为什么SparkSession为一个动作执行两次?

最近升级到Spark 2.0,我在尝试从JSON字符串创建一个简单的数据集时看到了一些奇怪的行为。 这是一个简单的测试用例:

SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD rdd = sc.parallelize(Arrays.asList( "{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}", "{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}" )); JavaRDD mappedRdd = rdd.map(json -> { System.out.println("mapping json: " + json); return json; }); Dataset data = spark.read().json(mappedRdd); data.show(); 

并输出:

 mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]} mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]} mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]} mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]} +----+--------------------+--------+ |name| roles| title| +----+--------------------+--------+ | tom|[designer, develo...|engineer| |jack| [designer, manager]| cto| +----+--------------------+--------+ 

似乎“map”函数正在执行两次,即使我只执行一个动作。 我认为Spark会懒惰地构建一个执行计划,然后在需要时执行它,但这似乎为了将数据读取为JSON并对其执行任何操作,计划必须至少执行两次。

在这个简单的情况下它并不重要,但是当map函数长时间运行时,这就成了一个大问题。 这是对的,还是我错过了什么?

这是因为您没有为DataFrameReader提供架构。 因此,Spark必须急切地扫描数据集以推断输出模式。

由于mappedRdd未缓存,因此将对其进行两次评估:

  • 一次用于模式推断
  • 一旦你调用data.show

如果你想阻止你应该为读者提供架构(Scala语法):

 val schema: org.apache.spark.sql.types.StructType = ??? spark.read.schema(schema).json(mappedRdd)