1

到目前为止,我能够使用 mongo-hadoop-core 1.4.2 从 MongoDB 检索数据。我要操作的数据是我正在查询的集合中每个文档内的嵌入文档内的数组内的值,我需要这些值作为Double's. 从集合中检索到的数据的类型为 RDD[(Object, org.bson.BSONObject)],这意味着每个文档都是一个类型的元组 (Object, org.bson.BSONObject)。

每当我想获得嵌入文档时,我都会这样做(在 spark-shell 1.5.1 上工作):

import com.mongodb.{BasicDBObject, BasicDBList} // classes I am using here.    

// 'documents' already taken from collection.
scala> documents
res4: org.apache.spark.rdd.RDD[(Object, org.bson.BSONObject)] = NewHadoopRDD[0] at newAPIHadoopRDD at <console>:32

// getting one document.
scala> val doc = documents.take(1)(0) 
doc: (Object, org.bson.BSONObject) = ( ... _id fields ... , ... lots of fields ...)

// getting an embed document from tuple's second element.
scala> val samples = doc._2.get("samp") match {case x: BasicDBObject => x}
samples: com.mongodb.BasicDBObject = (... some fields ...)

// getting an embed document.
scala> val latency = samples.get("latency") match {case x: BasicDBObject => x}
latency: com.mongodb.BasicDBObject = { "raw" : [ 9.71 , 8.77 , 10.16 , 9.49 , 8.54 , 10.29 , 9.55 , 9.16 , 10.78 , 10.31 , 9.54 , 10.69 , 10.33 , 9.58 , 9.07 , 9.72 , 9.48 , 8.72 , 10.59 , 9.81 , 9.31 , 10.64 , 9.87 , 9.29 , 10.38 , 9.64 , 8.86 , 10.84 , 10.06 , 9.29 , 8.45 , 9.08 , 7.55 , 9.75 , 9.05 , 10.38 , 9.64 , 8.25 , 10.27 , 9.54 , 8.52 , 10.26 , 9.53 , 7.87 , 9.76 , 9.02 , 10.27 , 7.93 , 9.73 , 9 , 10.07 , 9.35 , 7.66 , 13.68 , 11.92 , 14.72 , 14 , 12.55 , 11.77 , 11.02 , 11.59 , 10.87 , 10.4 , 9.13 , 10.28 , 9.55 , 10.43 , 8.33 , 9.66 , 8.93 , 8.05 , 11.26 , 10.53 , 9.81 , 10.2 , 9.42 , 7.73 , 9.76 , 9.04 , 8.29 , 9.34 , 7.21 , 10.05 , 9.32 , 10.28 , 8.59 , 10.15 , 9.53 , 7.88 , 9.9 , 9.15 , 13.96 , 13.19 , 11 , 13.6 , 13.01 , 12.17 , 11.39 , 10.64 , 9.9] , "xtrf" : { "...

// getting a bson array.
scala> val array = latency.get("raw") match {case x: BasicDBList => x}
array: com.mongodb.BasicDBList =  [ 9.71 , 8.77 , 10.16 , 9.49 , 8.54 , 10.29 , 9.55 , 9.16 , 10.78 , 10.31 , 9.54 , 10.69 , 10.33 , 9.58 , 9.07 , 9.72 , 9.48 , 8.72 , 10.59 , 9.81 , 9.31 , 10.64 , 9.87 , 9.29 , 10.38 , 9.64 , 8.86 , 10.84 , 10.06 , 9.29 , 8.45 , 9.08 , 7.55 , 9.75 , 9.05 , 10.38 , 9.64 , 8.25 , 10.27 , 9.54 , 8.52 , 10.26 , 9.53 , 7.87 , 9.76 , 9.02 , 10.27 , 7.93 , 9.73 , 9 , 10.07 , 9.35 , 7.66 , 13.68 , 11.92 , 14.72 , 14 , 12.55 , 11.77 , 11.02 , 11.59 , 10.87 , 10.4 , 9.13 , 10.28 , 9.55 , 10.43 , 8.33 , 9.66 , 8.93 , 8.05 , 11.26 , 10.53 , 9.81 , 10.2 , 9.42 , 7.73 , 9.76 , 9.04 , 8.29 , 9.34 , 7.21 , 10.05 , 9.32 , 10.28 , 8.59 , 10.15 , 9.53 , 7.88 , 9.9 , 9.15 , 13.96 , 13.19 , 11 , 13.6 , 13.01 , 12.17 , 11.39 , 10.64 , 9.9]

将 Object 类型转换为 BasicDBObject 非常不方便,但我需要这样做才能使用get(key: String). 我也可以使用.asInstanceOf[BasicDBObject] match {case x: BasicDBObject => x}但有没有更好的方法??使用类中的继承方法可以直接获取特定类型,如Double、和。IntStringDateBasicBsonObject

至于BasicDBList,有一个get(key: String)方法,继承自BasicBsonList,它返回一个Object可以转换为Double但只能使用.asInstanceOf[Double]调用的方法,还有一个toArray()继承自返回一个我不能转换为java.util.ArrayList的数组,即使我正在做这里:ObjectDouble.map(_.asInstanceOf[Double])

scala> val arrayOfDoubles = array.toArray.map(_.asInstanceOf[Double])
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $iwC$$iwC$$iwC.<init>(<console>:50)
at $iwC$$iwC.<init>(<console>:52)
at $iwC.<init>(<console>:54)
at <init>(<console>:56)
at .<init>(<console>:60)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

但有时它会起作用。在某些文档中,此演员表有效,而在其他文档中则无效,并在上方打印该错误消息。这可能是 MongoDB 给出的数据结构中的一个问题,火花,但只是在这些文档中?具有 30 个值的较小数组似乎总是有效。

到目前为止,我的解决方案是这种低效的转换:

scala> val arrayOfDoubles = array.toArray.map(_.toString.toDouble)
arrayOfDoubles: Array[Double] = Array(9.71, 8.77, 10.16, 9.49, 8.54, 10.29, 9.55, 9.16, 10.78, 10.31, 9.54, 10.69, 10.33, 9.58, 9.07, 9.72, 9.48, 8.72, 10.59, 9.81, 9.31, 10.64, 9.87, 9.29, 10.38, 9.64, 8.86, 10.84, 10.06, 9.29, 8.45, 9.08, 7.55, 9.75, 9.05, 10.38, 9.64, 8.25, 10.27, 9.54, 8.52, 10.26, 9.53, 7.87, 9.76, 9.02, 10.27, 7.93, 9.73, 9.0, 10.07, 9.35, 7.66, 13.68, 11.92, 14.72, 14.0, 12.55, 11.77, 11.02, 11.59, 10.87, 10.4, 9.13, 10.28, 9.55, 10.43, 8.33, 9.66, 8.93, 8.05, 11.26, 10.53, 9.81, 10.2, 9.42, 7.73, 9.76, 9.04, 8.29, 9.34, 7.21, 10.05, 9.32, 10.28, 8.59, 10.15, 9.53, 7.88, 9.9, 9.15, 13.96, 13.19, 11.0, 13.6, 13.01, 12.17, 11.39, 10.64, 9.9)

我在这里遗漏了什么或者事情真的很不方便吗?为什么所有这些方法都必须返回Objector BSONObject?有什么办法可以克服我发现的这个问题吗?java.lang.Integer如果数组中没有整数被强制转换为双精度,这从何而来?

4

1 回答 1

1

首先,如果你还没有的话,我建议你看看casbah 。

回答您的问题:如果您导入 Java 转换:

import scala.collection.JavaConversions._

您应该能够在不调用 toArray 的情况下直接映射到集合上。如果您的数组包含 Doubles 或 Integers,您可以将其强制转换为 Number 并获得 double 值。像这样:

array.map(_.asInstanceOf[Number].doubleValue)

我不知道您的数据源是什么样的,但考虑到您偶尔会在期望 Double 的地方得到一个整数,因此很可能假设您将四舍五入的十进制数(例如 11.0)存储为整数(例如 11)。

于 2015-12-17T10:48:05.413 回答