0

我正在将我的 Spark 作业从本地笔记本电脑提交到远程独立 Spark 集群 (spark://IP:7077)。提交成功。但是,我没有得到任何输出,一段时间后它失败了。当我检查集群上的工作人员时,我发现以下异常:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]

当我在本地系统 (local[*]) 上运行相同的代码时,它会成功运行并给出输出。

请注意,我在 spark notebook 中运行它。当我通过终端使用相同的应用程序提交它时,它在远程独立集群上成功运行spark-submit

我在笔记本的配置中遗漏了什么吗?还有其他可能的原因吗?

代码非常简单。

详细异常:

Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: ActorSelection[Actor[akka.tcp://sparkDriver@localhost:54561/]/user/CoarseGrainedScheduler]
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
    at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
    at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
    at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
    at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:512)
    at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:545)
    at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:535)
    at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:91)
    at akka.actor.ActorRef.tell(ActorRef.scala:125)
    at akka.dispatch.Mailboxes$$anon$1$$anon$2.enqueue(Mailboxes.scala:44)
    at akka.dispatch.QueueBasedMessageQueue$class.cleanUp(Mailbox.scala:438)
    at akka.dispatch.UnboundedDequeBasedMailbox$MessageQueue.cleanUp(Mailbox.scala:650)
    at akka.dispatch.Mailbox.cleanUp(Mailbox.scala:309)
    at akka.dispatch.MessageDispatcher.unregister(AbstractDispatcher.scala:204)
    at akka.dispatch.MessageDispatcher.detach(AbstractDispatcher.scala:140)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:203)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

示例代码

val logFile = "hdfs://hostname/path/to/file"
val conf = new SparkConf() 
.setMaster("spark://hostname:7077") // as appears on hostname:8080
.setAppName("myapp")
.set("spark.executor.memory", "20G")
.set("spark.cores.max", "40")
.set("spark.executor.cores","20")
.set("spark.driver.allowMultipleContexts","true")

val sc2 = new SparkContext(conf)
val logData = sc2.textFile(logFile)
val numAs = logData.filter(line => line.contains("hello")).count()
val numBs = logData.filter(line => line.contains("hi")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
4

2 回答 2

1

更新:

可以通过在应用程序代码中包含驱动程序的 IP 地址(即本地笔记本电脑的公共 IP)来避免上述问题。这可以通过在 spark 上下文中添加以下行来完成:

.set("spark.driver.host",YourSystemIPAddress)

但是,如果驱动程序的 IP 地址位于 NAT 后面,则可能会出现问题。在这种情况下,工作人员将无法找到 IP。

于 2016-01-21T19:09:38.347 回答
0

当您说“spark notebook”时,我假设您的意思是 github 项目https://github.com/andypetrella/spark-notebook

我将不得不查看笔记本的细节,但我注意到您的工作人员正在尝试连接到“本地主机”上的主机。

对于正常的 Spark 配置,在 $SPARK_HOME/conf/spark-env.sh 中设置 SPARK_MASTER_IP 并查看是否有帮助,即使您在单台机器上以独立模式运行,也请设置它。根据我的经验,Spark 并不总是能正确解析主机名,因此从所有 IP 的基线开始是个好主意。

其余的是一般信息,看看它是否有助于解决您的具体问题:

如果您从笔记本电脑提交到集群,您可以使用 --deploy-mode 集群来告诉您的驱动程序在其中一个工作节点上运行。这会额外考虑如何设置类路径,因为您不知道驱动程序将在哪个工作人员上运行。

为了完整起见,这里有一些一般信息,有一个关于主机名解析为 IP 地址的已知 Spark 错误。我并不是在所有情况下都将其作为完整的答案,但我建议尝试使用仅使用所有 IP 的基线,并且仅使用单个配置 SPARK_MASTER_IP。仅通过这两种做法,我就可以让我的集群正常工作,而所有其他配置或使用主机名似乎都把事情搞砸了。

因此,在您的 spark-env.sh 中摆脱 SPARK_LOCAL_IP 并将 SPARK_MASTER_IP 更改为 IP 地址,而不是主机名。

我在这个答案中更详细地处理了这个问题。

为了更完整,这是该答案的一部分:

你能 ping 通 Spark master 运行的盒子吗?你能从主人那里ping通工人吗?更重要的是,你能从 master box 对 worker 进行无密码 ssh 吗?根据 1.5.2 文档,您需要能够使用私钥执行此操作,并将工作人员输入到 conf/slaves 文件中。我在最后复制了相关段落。

您可能会遇到工作人员可以联系主人但主人无法回到工作人员的情况,因此看起来没有建立连接。检查两个方向。我认为主节点上的从属文件和无密码 ssh 可能会导致与您所看到的类似的错误。

根据我交联的答案,还有一个旧错误,但尚不清楚该错误是如何解决的。

于 2016-01-14T16:24:44.913 回答