为什么Spark在本地模式下失败并且“无法获得broadcast_0的broadcast_0_piece0”?

我正在运行这个片段来对点的RDD进行排序,对RDD进行排序并从给定点获取K-最近点:

def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = { val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point => (DistanceUtils.euclidianDistance(point, point2), point)) sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k)) 

}

在我的应用程序中只使用一个SparkContext并将其作为参数传递给我的函数,我得到一个org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0在我调用sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))时, org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0错误的sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))point2获得KNN点。

我正在构建sparkContext因为这个片段如下:

 var sparkContext = new SparkContext("local", "") 

面对这种错误的可能原因是什么?

基本上这是我的独立spark环境的LOG,其中包含此错误的堆栈跟踪:

 15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731] 15/12/24 11:55:29 INFO Utils: Successfully started service 'sparkDriver' on port 55731. 15/12/24 11:55:29 INFO SparkEnv: Registering MapOutputTracker 15/12/24 11:55:29 INFO SparkEnv: Registering BlockManagerMaster 15/12/24 11:55:29 INFO DiskBlockManager: Created local directory at /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/blockmgr-70e73cfe-683b-4297-aa5d-de38f98d02f1 15/12/24 11:55:29 INFO MemoryStore: MemoryStore started with capacity 491.7 MB 15/12/24 11:55:29 INFO HttpFileServer: HTTP File server directory is /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/spark-f7bc8b6f-7d4f-4c55-8dff-0fbc4f6c2532/httpd-fb502369-4c28-4585-a37e-f3645d1d55a3 15/12/24 11:55:29 INFO HttpServer: Starting HTTP Server 15/12/24 11:55:29 INFO Utils: Successfully started service 'HTTP file server' on port 55732. 15/12/24 11:55:29 INFO SparkEnv: Registering OutputCommitCoordinator 15/12/24 11:55:29 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/12/24 11:55:29 INFO SparkUI: Started SparkUI at http://localhost:4040 15/12/24 11:55:29 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/12/24 11:55:29 INFO Executor: Starting executor ID driver on host localhost 15/12/24 11:55:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55733. 15/12/24 11:55:29 INFO NettyBlockTransferService: Server created on 55733 15/12/24 11:55:29 INFO BlockManagerMaster: Trying to register BlockManager 15/12/24 11:55:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55733 with 491.7 MB RAM, BlockManagerId(driver, localhost, 55733) 15/12/24 11:55:29 INFO BlockManagerMaster: Registered BlockManager 15/12/24 11:55:30 INFO TorrentBroadcast: Started reading broadcast variable 0 org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.RDD.sortBy$default$3(RDD.scala:548) at LOF$.getKNN(LOF.scala:14) at LOF$.lof(LOF.scala:25) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply$mcV$sp(BehaviourActivityScoreJudgeTest.scala:14) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656) at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714) at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760) at BehaviourActivityScoreJudgeTest.org$scalatest$BeforeAndAfterAll$$super$run(BehaviourActivityScoreJudgeTest.scala:4) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at BehaviourActivityScoreJudgeTest.run(BehaviourActivityScoreJudgeTest.scala:4) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) at org.scalatest.tools.Runner$.run(Runner.scala:883) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:137) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) ... 94 more 15/12/24 11:55:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4040 15/12/24 11:55:30 INFO DAGScheduler: Stopping DAGScheduler 15/12/24 11:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/12/24 11:55:30 INFO MemoryStore: MemoryStore cleared 15/12/24 11:55:30 INFO BlockManager: BlockManager stopped 15/12/24 11:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped 15/12/24 11:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 15/12/24 11:55:30 INFO SparkContext: Successfully stopped SparkContext 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 

刚刚发现为什么我得到这个exception:因为我的SparkContext对象在ScalaTest方法之间多次启动/停止。 所以,修正这种行为会让我以正常的方式工作。

我也面临同样的问题。 经过大量的谷歌搜索后,我发现我已经为SparkContext初始化创建了一个单例类,它只对单个JVM实例有效,但是对于Spark,这个单例类将从在单独的JVM实例上运行的每个工作节点调用,因此导致到多个SparkContext对象。

我也得到了这个错误。 我还没有看到任何具体的编码示例,所以我将分享我的解决方案。 这清除了我的错误,但我有一种感觉,这个问题可能有多个解决方案。 但这值得一试,因为它保留了代码中的所有内容。

看起来好像SparkContext正在关闭,从而引发了错误。 我认为问题是SparkContext是在类中创建的,然后扩展到其他类。 扩展导致它关闭,这有点烦人。 下面是我用来清除此错误的实现。

Spark初始化类:

 import org.apache.spark.{SparkConf, SparkContext} class Spark extends Serializable { def getContext: SparkContext = { @transient lazy val conf: SparkConf = new SparkConf() .setMaster("local") .setAppName("test") @transient lazy val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("OFF") sc } } 

主类:

 object Test extends Spark{ def main(args: Array[String]): Unit = { val sc = getContext val irisRDD: RDD[String] = sc.textFile("...") ... } 

然后用Spark类扩展你的另一个类,它应该都可以解决。

我得到运行LogisticRegression模型的错误,所以我认为这应该为你和其他机器学习库修复它。

对我来说有帮助,因为SparkContext已经创建了

 val sc = SparkContext.getOrCreate() 

在我尝试这个之前

 val conf = new SparkConf().setAppName("Testing").setMaster("local").set("spark.driver.allowMultipleContexts", "true") val sc = SparkContext(conf) 

但是当我跑的时候它被打破了

  spark.createDataFrame(rdd, schema) 

与上述答案相关,当我无意中将数据连接器(即Cassandra连接驱动程序)查询序列化为spark slave时,我遇到了这个问题。 然后分离出自己的SparkContext,并在4秒内整个应用程序崩溃