我有一个 vmware cloudera 映像,使用 centos6.8 运行的 cdh-5.7,我使用 OS X 作为我的开发机器,以及运行代码的 cdh 映像。
更新
这是我目前正在使用的 build.sbt,我刚刚将 spark 版本从官方(1.6.1)更新到 1.6.0-cdh5.7.0:
[cloudera@quickstart awesome-recommendation-engine]$ cat build.sbt
name := "my-recommendation-spark-engine"
version := "1.0-SNAPSHOT"
scalaVersion := "2.10.4"
val sparkVersion = "1.6.0-cdh5.7.0"
val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri"),
// HTTP client to request data to Amazon
"net.databinder.dispatch" %% "dispatch-core" % "0.11.1",
// HTML parser
"org.jodd" % "jodd-lagarto" % "3.5.2",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.play" % "play-json_2.10" % "2.4.0-M2",
"org.scalatest" % "scalatest_2.10" % "2.2.1" % "test",
"org.twitter4j" % "twitter4j-core" % "4.0.2",
"org.twitter4j" % "twitter4j-stream" % "4.0.2",
"org.codehaus.jackson" % "jackson-core-asl" % "1.6.1",
"org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-mllib_2.10" % "1.6.0-cdh5.7.0",
"com.google.code.gson" % "gson" % "2.6.2",
"commons-cli" % "commons-cli" % "1.3.1",
"com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
// Akka
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
// MongoDB
"org.reactivemongo" %% "reactivemongo" % "0.10.0"
)
packAutoSettings
resolvers ++= Seq(
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Akka Repository" at "http://repo.akka.io/releases/",
"Twitter4J Repository" at "http://twitter4j.org/maven2/",
"Apache HBase" at "https://repository.apache.org/content/repositories/releases",
"Twitter Maven Repo" at "http://maven.twttr.com/",
"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
"Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
Resolver.sonatypeRepo("public")
)
这是我位于 cdh 映像中的 /etc/hosts 文件,其中包含如下一行:
127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain
我正在运行的 cloudera 版本是:
[cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties
# Autogenerated build properties
version=2.6.0-cdh5.7.0
git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1
cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76
cloudera.base-branch=cdh5-base-2.6.0
cloudera.build-branch=cdh5-2.6.0_5.7.0
cloudera.pkg.version=2.6.0+cdh5.7.0+1280
cloudera.pkg.release=1.cdh5.7.0.p0.92
cloudera.cdh.release=cdh5.7.0
cloudera.build.time=2016.03.23-18:30:29GMT
我可以在 vmware 机器上执行 ls 命令:
[cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv
-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv
我可以阅读它的内容:
[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l
568454
代码很简单,只是试图映射它的内容:
val ratingFile="hdfs://192.168.30.139:8020/user/cloudera/ratings.csv"
//where 192.168.30.139 is the eth0 assigned ip of cloudera image
case class AmazonRating(userId: String, productId: String, rating: Double)
val NumRecommendations = 10
val MinRecommendationsPerUser = 10
val MaxRecommendationsPerUser = 20
val MyUsername = "myself"
val NumPartitions = 20
println("Using this ratingFile: " + ratingFile)
// first create an RDD out of the rating file
val rawTrainingRatings = sc.textFile(ratingFile).map {
line =>
val Array(userId, productId, scoreStr) = line.split(",")
AmazonRating(userId, productId, scoreStr.toDouble)
}
// only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products
val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()
println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}")
我收到这条消息:
**06/01/2016 17:20:04 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources**
UPDATE
06/02/2016: I have increased memory (8GB) and available cores (4) to the vmware image, but the same exception than above happens. The file that i am trying to load from HDFS is only 16MB, it cannot be a matter of available resources!
If i update /etc/hosts file with this line:
192.168.30.139 quickstart.cloudera quickstart localhost localhost.domain
instead of
[cloudera@quickstart bin]$ cat /etc/hosts
127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain
where 192.168.30.139 is the actual assigned ip, i get this exception:
Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
because if i run the exact code within the spark-shell, i got this message:
Parsed hdfs://192.168.30.139:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454
为什么它在 spark-shell 中运行良好,但没有在 vmware 映像中以编程方式运行?
更新
我正在使用 sbt-pack 插件运行代码以生成 unix 命令并在具有 spark 伪集群的 vmware 映像中运行它们,
这是我用来实例化 sparkconf 的代码:
val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
.setMaster("spark://192.168.30.139:7077") .set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sparkConf, Seconds(2))
//this checkpointdir should be in a conf file, for now it is hardcoded!
val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint"
ssc.checkpoint(streamingCheckpointDir)
我认为这一定是cloudera配置文件中的错误配置,但是哪个?
更新 2 06/01/2016
好的,更改 ip (192.168.30.139) 而不是完全限定名称 (quickstart.cloudera) 现在消除了之前的异常,但现在出现了这个警告:
**16/06/01 17:20:04 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources**
如果我运行下一个命令:
[cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-master status
Spark master is running [ OK ]
[cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-worker status
Spark worker is running [ OK ]
我可以看到 spark-master 和 spark-worker 正在运行,但是当我检查 192.168.30.139:18081 时,检查 spark-worker 状态的网页,我看到:
URL: spark://192.168.30.139:7077
REST URL: spark://192.168.30.139:6066 (cluster mode)
Alive Workers: 1
Cores in use: 4 Total, 0 Used
Memory in use: 6.7 GB Total, 0.0 B Used
Applications: 0 Running, 4 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE
Workers
Worker Id Address State Cores Memory
worker-20160602181029-192.168.30.139-7078
我不知道该怎么做,我已经为 vmware 映像增加了尽可能多的资源,并且发生了同样的错误......
16/06/02 18:32:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
非常感谢您阅读到这里。