解决Apache Spark中的依赖性问题

构建和部署Spark应用程序时的常见问题是:

  • java.lang.ClassNotFoundException
  • object x is not a member of package y编译错误object x is not a member of package y
  • java.lang.NoSuchMethodError

如何解决这些问题?

Apache Spark的类路径是动态构建的(以适应每个应用程序的用户代码),这使得它易受此类问题的攻击。 @ user7337271的答案是正确的,但还有一些问题,具体取决于你正在使用的集群管理器 (“master”)。

首先,Spark应用程序由这些组件组成(每个组件都是一个单独的JVM,因此可能在其类路径中包含不同的类):

  1. 驱动程序 :这是您的应用程序创建SparkSession (或SparkContext )并连接到集群管理器以执行实际工作
  2. Cluster Manager :作为集群的“入口点”,负责为每个应用程序分配执行程序。 Spark支持几种不同的类型:独立,YARN和Mesos,我们将在下面描述。
  3. 执行者 :这些是集群节点上的进程,执行实际工作(运行Spark 任务

Apache Spark的集群模式概述在此图中描述了它们之间的关系:

群集模式概述

现在 – 哪些类应该驻留在每个组件中?

这可以通过下图解答:

课程安排概述

我们慢慢解析一下:

  1. Spark Code是Spark的库。 它们应该存在于所有三个组件中,因为它们包含让Spark执行它们之间通信的粘合剂。 顺便说一句 – Spark作者做出了一个设计决定,要求在所有组件中包含所有组件的代码(例如,包括只应在驱动程序中的Executor中运行的代码)以简化这一过程 – 所以Spark的“胖jar”(版本高达1.6) )或“归档”(在2.0中,详细信息如下)包含所有组件的必要代码,并且应该在所有组件中都可用。

  2. 仅驱动程序代码这是用户代码,不包括应在Executors上使用的任何内容,即RDD / DataFrame / Dataset上的任何转换中未使用的代码。 这不一定必须与分布式用户代码分开,但也可以。

  3. 分布式代码这是用驱动程序代码编译的用户代码,但也必须在执行程序上执行 – 实际转换使用的所有内容都必须包含在此jar中。

既然我们已经做到了这一点, 那么我们如何让每个组件中的类正确加载,以及它们应遵循哪些规则?

  1. Spark代码 :如前所述,您必须在所有组件中使用相同的ScalaSpark版本。

    1.1在独立模式下,应用程序(驱动程序)可以连接“预先存在的”Spark安装。 这意味着所有驱动程序必须使用在主服务器和执行程序上运行的相同Spark版本

    1.2在YARN / Mesos中 ,每个应用程序可以使用不同的Spark版本,但同一应用程序的所有组件必须使用相同的版本。 这意味着如果您使用版本X来编译和打包驱动程序应用程序,则应在启动SparkSession时提供相同的版本(例如,在使用YARN时通过spark.yarn.archivespark.yarn.jars参数)。 您提供的jar / archive应该包括所有Spark依赖项( 包括传递依赖项 ),并且在应用程序启动时,集群管理器会将其提供给每个执行程序。

  2. 驱动程序代码 :完全取决于 – 驱动程序代码可以作为一堆jar子或“胖jar”发货,只要它包含所有Spark依赖项+所有用户代码

  3. 分布式代码 :除了存在于驱动程序之外,还必须将此代码发送给执行程序(同样,还有它的所有传递依赖项)。 这是使用spark.jars参数完成的。

总而言之 ,这是建立和部署Spark应用程序的建议方法(在本例中为使用YARN):

  • 使用分布式代码创建一个库,将其打包为“常规”jar(带有描述其依赖项的.pom文件)和“胖jar”(包含所有传递依赖项)。
  • 创建一个驱动程序应用程序,在分布式代码库和Apache Spark上使用编译依赖项(具有特定版本)
  • 将驱动程序应用程序打包到胖jar中以部署到驱动程序
  • 在启动SparkSession时,将正确版本的分布式代码作为spark.jars参数的值SparkSession
  • 将包含下载的Spark二进制文件的lib/文件夹下的所有jar的归档文件(例如gzip)的位置作为spark.yarn.archive的值spark.yarn.archive

构建和部署Spark应用程序时,所有依赖项都需要兼容版本。

  • Scala版本 。 所有包都必须使用相同的主要(2.10,2.11,2.12)Scala版本。

    考虑以下(不正确的) build.sbt

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    我们对Scala 2.10使用spark-streaming ,而其余​​的包用于Scala 2.11。 有效的文件可能是

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.11" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    但最好全局指定版本并使用%%

     name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-streaming" % "2.0.1", "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1" ) 

    同样在Maven中:

      com.example simple-project 4.0.0 Simple Project jar 1.0  2.0.1     org.apache.spark spark-core_2.11 ${spark.version}   org.apache.spark spark-streaming_2.11 ${spark.version}   org.apache.bahir spark-streaming-twitter_2.11 ${spark.version}    
  • Spark版本所有软件包都必须使用相同的主要Spark版本(1.6,2.0,2.1,…)。

    考虑以下(不正确的)build.sbt:

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "1.6.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    我们使用spark-core 1.6,而其余组件都在Spark 2.0中。 有效的文件可能是

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    但最好使用变量:

     name := "Simple Project" version := "1.0" val sparkVersion = "2.0.1" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion, "org.apache.spark" % "spark-streaming_2.10" % sparkVersion, "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion ) 

    同样在Maven中:

      com.example simple-project 4.0.0 Simple Project jar 1.0  2.0.1 2.11     org.apache.spark spark-core_${scala.version} ${spark.version}   org.apache.spark spark-streaming_${scala.version} ${spark.version}   org.apache.bahir spark-streaming-twitter_${scala.version} ${spark.version}    
  • Spark依赖项中使用的Spark版本必须匹配Spark安装的Spark版本。 例如,如果在群集上使用1.6.1,则必须使用1.6.1来构建jar。 不一定接受次要版本不匹配。

  • 用于构建jar的Scala版本必须与用于构建部署Spark的Scala版本相匹配。 默认情况下(可下载的二进制文件和默认构建):

    • Spark 1.x – > Scala 2.10
    • Spark 2.x – > Scala 2.11
  • 如果包含在胖jar中,则应该可以在工作节点上访问其他包。 有很多选择,包括:

    • --jars参数spark-submit – 分发本地jar文件。
    • --packages spark-submit --packages参数 – 从Maven存储库获取依赖项。

    在集群节点中提交时,您应该在--jars包含应用程序jar

除了user7337271已经给出的非常广泛的答案之外,如果问题是由于缺少外部依赖性而导致的,那么您可以使用maven程序集插件构建一个带有依赖关系的jar

在这种情况下,请确保在构建系统中将所有核心spark依赖项标记为“已提供”,并且如前所述,确保它们与运行时spark版本相关联。

应用程序的依赖项类应在启动命令的application-jar选项中指定。

更多细节可以在Spark文档中找到

取自文档:

application-jar:包含应用程序和所有依赖项的捆绑jar的路径。 URL必须在群集内部全局可见,例如,hdfs://路径或所有节点上存在的file://路径

我认为这个问题必须解决一个程序集插件。 你需要建一个胖jar子。 例如在sbt中:

  • 使用代码addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")添加文件$PROJECT_ROOT/project/assembly.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  • to build.sbt added some libraries libraryDependencies ++ = Seq(“com.some.company”%%“some-lib”%“1.0.0”)
  • 在sbt控制台中输入“assembly”,并部署程序集jar

如果您需要更多信息,请访问https://github.com/sbt/sbt-assembly