如何使用Google DataProc Java Client在相关的GS存储桶中使用jar文件和类提交spark作业?

我需要触发Spark Jobs以使用API​​调用从JSON文件聚合数据。 我使用spring-boot来创建资源。 因此,解决方案的步骤如下:

  1. 用户使用json文件作为输入发出POST请求
  2. JSON文件存储在与数据中心群集关联的Google存储桶中。
  3. 在REST方法中使用指定的jar,类和参数触发聚合spark作业是json文件链接。

我希望使用Dataproc的Java Client而不是控制台或命令行来触发作业。 你怎么做呢?

我们希望很快就会在官方文档中提供更全面的指南,但要开始使用,请访问以下API概述: https : //developers.google.com/api-client-library/java/apis/dataproc/v1

它包括Dataproc javadocs的链接; 如果您的服务器代表您自己的项目而不是代表您的最终用户的Google项目进行调用,那么您可能希望在此解释基于密钥文件的服务帐户身份validation,以创建用于初始化DataprocCredential对象客户端存根。

至于dataproc特定的部分,这只是意味着如果使用Maven,则将以下依赖项添加到Maven pomfile:

    com.google.apis google-api-services-dataproc v1-rev4-1.21.0    

然后你会得到如下代码:

 Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential) .setApplicationName("my-webabb/1.0") .build(); dataproc.projects().regions().jobs().submit( projectId, "global", new SubmitJobRequest() .setJob(new Job() .setPlacement(new JobPlacement() .setClusterName("my-spark-cluster")) .setSparkJob(new SparkJob() .setMainClass("FooSparkJobMain") .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar")) .setArgs(ImmutableList.of( "arg1", "arg2", "arg3"))))) .execute(); 

由于不同的中间服务器可能会进行低级重试,或者您的请求可能会抛出IOException而您不知道作业提交是否成功,您可能想要采取的一个额外步骤是生成您自己的jobId ; 然后你知道什么jobId要进行轮询以确定它是否已经提交,即使你的请求超时或抛出一些未知的exception:

 import java.util.UUID; ... Dataproc dataproc = new Dataproc.Builder(new NetHttpTransport(), new JacksonFactory(), credential) .setApplicationName("my-webabb/1.0") .build(); String curJobId = "json-agg-job-" + UUID.randomUUID().toString(); Job jobSnapshot = null; try { jobSnapshot = dataproc.projects().regions().jobs().submit( projectId, "global", new SubmitJobRequest() .setJob(new Job() .setReference(new JobReference() .setJobId(curJobId)) .setPlacement(new JobPlacement() .setClusterName("my-spark-cluster")) .setSparkJob(new SparkJob() .setMainClass("FooSparkJobMain") .setJarFileUris(ImmutableList.of("gs://bucket/path/to/your/spark-job.jar")) .setArgs(ImmutableList.of( "arg1", "arg2", "arg3"))))) .execute(); } catch (IOException ioe) { try { jobSnapshot = dataproc.projects().regions().jobs().get( projectId, "global", curJobId).execute(); logger.info(ioe, "Despite exception, job was verified submitted"); } catch (IOException ioe2) { // Handle differently; if it's a GoogleJsonResponseException you can inspect the error // code, and if it's a 404, then it means the job didn't get submitted; you can add retry // logic in that case. } } // We can poll on dataproc.projects().regions().jobs().get(...) until the job reports being // completed or failed now.