在PySpark中运行自定义Java类

我正在尝试在PySpark中运行自定义HDFS阅读器类。 这个类是用Java编写的,我需要从PySpark访问它,无论是从shell还是使用spark-submit。

在PySpark中,我从SparkContext( sc._gateway )中检索sc._gateway

说我有一节课:

 package org.foo.module public class Foo { public int fooMethod() { return 1; } } 

我试图将它打包到jar中并使用--jar选项传递给pyspark,然后运行:

 from py4j.java_gateway import java_import jvm = sc._gateway.jvm java_import(jvm, "org.foo.module.*") foo = jvm.org.foo.module.Foo() 

但我得到错误:

 Py4JError: Trying to call a package. 

有人可以帮忙吗? 谢谢。

您所描述的问题通常表明org.foo.module不在驱动程序CLASSPATH上。 一种可能的解决方案是使用spark.driver.extraClassPath添加jar文件。 例如,它可以在conf/spark-defaults.conf设置,也可以作为命令行参数提供。

旁注:

  • 如果您使用的类是自定义输入格式,则不需要使用Py4j网关。 您只需使用SparkContext.hadoop* / SparkContext.newAPIHadoop*方法即可。

  • 使用java_import(jvm, "org.foo.module.*")看起来不错。 一般来说,应该避免在JVM上进行不必要的导入。 这不是公开的原因,你真的不想搞砸。 特别是当您以一种使此导入完全过时的方式访问时。 所以jvm.org.foo.module.Foo() java_import并坚持使用jvm.org.foo.module.Foo()

在PySpark中尝试以下操作

 from py4j.java_gateway import java_import java_import(sc._gateway.jvm,"org.foo.module.Foo") func = sc._gateway.jvm.Foo() func.fooMethod() 

确保已将Java代码编译为可运行的jar并提交spark作业

 spark-submit --driver-class-path "name_of_your_jar_file.jar" --jars "name_of_your_jar_file.jar" name_of_your_python_file.py 

而不是--jars你应该使用--packages将包导入你的spark-submit动作。