Apache Spark Streaming的失败集成测试

我一直试图找出我为Apache Spark项目编写的一些单元/集成测试的问题。

当使用Spark 1.1.1时,我的测试通过了。 当我尝试升级到1.4.0(也尝试过1.4.1)时,测试开始失败。

我已经设法将重现问题所需的代码减少到下面的小集成测试。

有趣的是,如果我在测试中注释掉@RunWith注释,那么测试就会正确传递。 显然我不需要@RunWith注释来进行这种减少测试,但真正的测试相当广泛地使用了模拟,所以我宁愿不必使用PowerMock。

package com.example; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) public class SampleTest { @Before public void setup() throws Exception { SparkConf conf = new SparkConf(false).setMaster("local[2]").setAppName("My app"); JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(1000)); } @Test public void exampleTest() { } } 

以下是我看到的例外情况

 java.io.IOException: failure to login at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:796) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) at org.apache.spark.SparkContext.(SparkContext.scala:301) at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:842) at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:80) at org.apache.spark.streaming.api.java.JavaStreamingContext.(JavaStreamingContext.scala:133) at com.example.SampleTest.setup(SampleTest.java:19) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.internal.runners.MethodRoadie.runBefores(MethodRoadie.java:133) at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282) at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87) at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122) at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106) at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53) at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: javax.security.auth.login.LoginException: Can't find user name at org.apache.hadoop.security.UserGroupInformation$HadoopLoginModule.commit(UserGroupInformation.java:197) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at javax.security.auth.login.LoginContext.invoke(LoginContext.java:784) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203) at javax.security.auth.login.LoginContext$5.run(LoginContext.java:721) at javax.security.auth.login.LoginContext$5.run(LoginContext.java:719) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokeCreatorPriv(LoginContext.java:718) at javax.security.auth.login.LoginContext.login(LoginContext.java:591) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:771) ... 38 more 

各种依赖项的版本如下所示

  • hadoop-client 2.6
  • apache spark 1.4.0 / 1.4.1
  • junit 4.12
  • 轻松模拟3.31
  • power mock 1.6.2

我试过各种版本的Spark。 上面的测试通过了以下版本的Spark

  • 1.1.1
  • 1.2.2

它从Spark 1.3.0开始失败。

有什么想法我需要改变才能让它发挥作用?

查看SparkContext的源代码,该行在尝试获取当前用户名时导致exception。 在版本1.2中有一个回退默认SparkContext.SPARK_UNKNOWN_USER ,它不需要当前登录用户:

  // Set SPARK_USER for user who is running SparkContext. val sparkUser = Option { Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name")) }.getOrElse { SparkContext.SPARK_UNKNOWN_USER } 

版本1.3中引入的此代码不再具有默认用户,因此您不会在早期版本中出现此错误:

 // Set SPARK_USER for user who is running SparkContext. val sparkUser = Utils.getCurrentUserName() 

这将在Utils调用以下代码 :

 /** * Returns the current user name. This is the currently logged in user, unless that's been * overridden by the `SPARK_USER` environment variable. */ def getCurrentUserName(): String = { Option(System.getenv("SPARK_USER")) .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } 

如果设置环境变量SPARK_USER ,则应该能够阻止分支到UserGroupInformation,从而导致exception。

UserGroupInformation是一个Hadoop Security类,看起来使用PowerMock阻止它正常工作。