0

我有一个 vmware cloudera 映像,使用 centos6.8 运行的 cdh-5.7,我使用 OS X 作为我的开发机器,以及运行代码的 cdh 映像。

更新

这是我目前正在使用的 build.sbt,我刚刚将 spark 版本从官方(1.6.1)更新到 1.6.0-cdh5.7.0:

[cloudera@quickstart awesome-recommendation-engine]$ cat build.sbt 
name := "my-recommendation-spark-engine"

version := "1.0-SNAPSHOT"

scalaVersion := "2.10.4"

val sparkVersion = "1.6.0-cdh5.7.0"

val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka_2.10" % "0.8.1"
      exclude("javax.jms", "jms")
      exclude("com.sun.jdmk", "jmxtools")
      exclude("com.sun.jmx", "jmxri"),
   // HTTP client to request data to Amazon
   "net.databinder.dispatch" %% "dispatch-core" % "0.11.1",
   // HTML parser
   "org.jodd" % "jodd-lagarto" % "3.5.2",
   "com.typesafe" % "config" % "1.2.1",
   "com.typesafe.play" % "play-json_2.10" % "2.4.0-M2",
   "org.scalatest" % "scalatest_2.10" % "2.2.1" % "test",
   "org.twitter4j" % "twitter4j-core" % "4.0.2",
   "org.twitter4j" % "twitter4j-stream" % "4.0.2",
   "org.codehaus.jackson" % "jackson-core-asl" % "1.6.1",
   "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test",
   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0-cdh5.7.0",
   "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.0",
   "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.0",
   "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.0",
   "org.apache.spark" % "spark-mllib_2.10" % "1.6.0-cdh5.7.0",
   "com.google.code.gson" % "gson" % "2.6.2",
   "commons-cli" % "commons-cli" % "1.3.1",
   "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
   // Akka
   "com.typesafe.akka" %% "akka-actor" % akkaVersion,
   "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
   // MongoDB
   "org.reactivemongo" %% "reactivemongo" % "0.10.0"
)

packAutoSettings

resolvers ++= Seq(
  "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
  "Spray Repository" at "http://repo.spray.cc/",
  "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Twitter4J Repository" at "http://twitter4j.org/maven2/",
  "Apache HBase" at "https://repository.apache.org/content/repositories/releases",
  "Twitter Maven Repo" at "http://maven.twttr.com/",
  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
  "Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
  Resolver.sonatypeRepo("public")
)

这是我位于 cdh 映像中的 /etc/hosts 文件,其中包含如下一行:

127.0.0.1       quickstart.cloudera     quickstart      localhost       localhost.domain

我正在运行的 cloudera 版本是:

[cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties

# Autogenerated build properties
version=2.6.0-cdh5.7.0
git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1
cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76
cloudera.base-branch=cdh5-base-2.6.0
cloudera.build-branch=cdh5-2.6.0_5.7.0
cloudera.pkg.version=2.6.0+cdh5.7.0+1280
cloudera.pkg.release=1.cdh5.7.0.p0.92
cloudera.cdh.release=cdh5.7.0
cloudera.build.time=2016.03.23-18:30:29GMT

我可以在 vmware 机器上执行 ls 命令:

[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv
-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv

我可以阅读它的内容:

[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l
568454

代码很简单,只是试图映射它的内容:

val ratingFile="hdfs://192.168.30.139:8020/user/cloudera/ratings.csv"
//where 192.168.30.139 is the eth0 assigned ip of cloudera image
case class AmazonRating(userId: String, productId: String, rating: Double)

val NumRecommendations = 10
val MinRecommendationsPerUser = 10
val MaxRecommendationsPerUser = 20
val MyUsername = "myself"
val NumPartitions = 20

println("Using this ratingFile: " + ratingFile)
  // first create an RDD out of the rating file
val rawTrainingRatings = sc.textFile(ratingFile).map {
    line =>
      val Array(userId, productId, scoreStr) = line.split(",")
      AmazonRating(userId, productId, scoreStr.toDouble)
}

// only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products
val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size  && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()

println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}")

我收到这条消息:

**06/01/2016 17:20:04 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources**

UPDATE

06/02/2016: I have increased memory (8GB) and available cores (4) to the vmware image, but the same exception than above happens. The file that i am trying to load from HDFS is only 16MB, it cannot be a matter of available resources!

If i update /etc/hosts file with this line:

192.168.30.139 quickstart.cloudera  quickstart  localhost   localhost.domain

instead of 

[cloudera@quickstart bin]$ cat /etc/hosts
127.0.0.1   quickstart.cloudera quickstart  localhost   localhost.domain 

where 192.168.30.139 is the actual assigned ip, i get this exception:

Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
because if i run the exact code within the spark-shell, i got this message:
Parsed hdfs://192.168.30.139:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454

为什么它在 spark-shell 中运行良好,但没有在 vmware 映像中以编程方式运行?

更新

我正在使用 sbt-pack 插件运行代码以生成 unix 命令并在具有 spark 伪集群的 vmware 映像中运行它们,

这是我用来实例化 sparkconf 的代码:

val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
                               .setMaster("spark://192.168.30.139:7077")                                       .set("spark.driver.allowMultipleContexts", "true")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    //this checkpointdir should be in a conf file, for now it is hardcoded!
    val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint"
    ssc.checkpoint(streamingCheckpointDir)

我认为这一定是cloudera配置文件中的错误配置,但是哪个?

更新 2 06/01/2016

好的,更改 ip (192.168.30.139) 而不是完全限定名称 (quickstart.cloudera) 现在消除了之前的异常,但现在出现了这个警告:

**16/06/01 17:20:04 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources**

如果我运行下一个命令:

[cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-master status
Spark master is running                                    [  OK  ]
[cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-worker status
Spark worker is running                                    [  OK  ]

我可以看到 spark-master 和 spark-worker 正在运行,但是当我检查 192.168.30.139:18081 时,检查 spark-worker 状态的网页,我看到:

URL: spark://192.168.30.139:7077
REST URL: spark://192.168.30.139:6066 (cluster mode)
Alive Workers: 1
Cores in use: 4 Total, 0 Used
Memory in use: 6.7 GB Total, 0.0 B Used
Applications: 0 Running, 4 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE
Workers

Worker Id   Address State   Cores   Memory
worker-20160602181029-192.168.30.139-7078

我不知道该怎么做,我已经为 vmware 映像增加了尽可能多的资源,并且发生了同样的错误......

16/06/02 18:32:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

非常感谢您阅读到这里。

4

0 回答 0