我正在将我的 Spark 作业从本地笔记本电脑提交到远程独立 Spark 集群 (spark://IP:7077)。提交成功。但是,我没有得到任何输出,一段时间后它失败了。当我检查集群上的工作人员时,我发现以下异常:
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
当我在本地系统 (local[*]) 上运行相同的代码时,它会成功运行并给出输出。
请注意,我在 spark notebook 中运行它。当我通过终端使用相同的应用程序提交它时,它在远程独立集群上成功运行spark-submit
我在笔记本的配置中遗漏了什么吗?还有其他可能的原因吗?
代码非常简单。
详细异常:
Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
at akka.actor.ActorRef.tell(ActorRef.scala:125)
at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44)
at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
示例代码
val logFile = "hdfs://hostname/path/to/file"
val conf = new SparkConf()
.setMaster("spark://hostname:7077") // as appears on hostname:8080
.setAppName("myapp")
.set("spark.executor.memory", "20G")
.set("spark.cores.max", "40")
.set("spark.executor.cores","20")
.set("spark.driver.allowMultipleContexts","true")
val sc2 = new SparkContext(conf)
val logData = sc2.textFile(logFile)
val numAs = logData.filter(line => line.contains("hello")).count()
val numBs = logData.filter(line => line.contains("hi")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))