我正在尝试使用 MongoDB Connector for Hadoop with Spark 来查询 MongoDB 中的一个集合,并将检索到的所有文档插入到另一个集合中。MongoUpdateWritable 类用于 RDD 的值以更新 MongoDB 中的集合,它有一个 upsert 标志。不幸的是, upsert 标志似乎对执行没有影响。代码执行时没有错误,就好像 upsert 标志设置为 false 一样。
此(Scala)代码连接到本地主机 mongod 进程,使用 mongo 客户端写入一些数据,然后尝试读取该数据并使用 spark 将其写入同一数据库中的另一个集合。在该写入未通过后,代码通过具有相同 ID 的 mongo 客户端将文档写入目标表并运行相同的 spark 作业,以显示 upsert 的更新部分正常工作。
火花版本:1.6.0-cdh5.7.0
hadoop 版本:2.6.0-cdh5.4.7
蒙戈版本:3.2.0
mongo-hadoop-core 版本:2.0.2
import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase}
import com.mongodb.{BasicDBObject, DBCollection, MongoClient}
import com.mongodb.hadoop.io.MongoUpdateWritable
import org.apache.hadoop.conf.Configuration
import org.bson.{BSONObject, BasicBSONObject, Document}
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object sparkTest extends App {
//setting up mongo
val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test")
var source: MongoCollection[Document] = mongo.getCollection("source")
val target: MongoCollection[Document] = mongo.getCollection("target")
source.drop()
target.drop()
//inserting document
val sourceDoc = new Document()
sourceDoc.put("unchanged","this field should not be changed")
sourceDoc.put("_id","1")
source.insertOne(sourceDoc)
//setting up spark
val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local")
val mongoConfig = new Configuration()
val sc = new SparkContext(conf)
mongoConfig.set("mongo.input.uri",
"mongodb://localhost:27017/test.source")
mongoConfig.set("mongo.output.uri",
"mongodb://localhost:27017/test.target")
//setting up read
val documents = sc.newAPIHadoopRDD(
mongoConfig, // Configuration
classOf[MongoInputFormat], // InputFormat
classOf[Object], // Key type
classOf[BSONObject]) // Value type
//building updates with no document matching the query in the target collection
val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
(value: BSONObject) => {
val query = new BasicBSONObject
query.append("_id", value.get("_id").toString)
val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
update.append("added","this data will be added")
println("val:"+value.toString)
println("query:"+query.toString)
println("update:"+update.toString)
new MongoUpdateWritable(
query, // Query
update, // Update
true, // Upsert flag
false, // Update multiple documents flag
true // Replace flag
)}
)
//saving updates
upsert_insert_rdd.saveAsNewAPIHadoopFile(
"",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
mongoConfig)
// At this point, there should be a new document in the target database, but there is not.
val count = target.count()
println("count after insert: "+count+", expected: 1")
//adding doc to display working update. This code will throw an exception if there is a
//document with a matching _id field in the collection, so if this breaks that means the upsert worked!
val targetDoc = new Document()
targetDoc.put("overwritten","this field should not be changed")
targetDoc.put("_id","1")
target.insertOne(targetDoc)
//building updates when a document matching the query exists in the target collection
val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
(value: BSONObject) => {
val query = new BasicBSONObject
query.append("_id", value.get("_id").toString)
val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
update.append("added","this data will be added")
println("val:"+value.toString)
println("query:"+query.toString)
println("update:"+update.toString)
new MongoUpdateWritable(
query, // Query
update, // Update
true, // Upsert flag
false, // Update multiple documents flag
true // Replace flag
)}
)
//saving updates
upsert_update_rdd.saveAsNewAPIHadoopFile(
"",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
mongoConfig)
//checking that the update succeeded. should print:
//contains new field:true, contains overwritten field:false
val ret = target.find().first
if (ret != null)
println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten"))
else
println("no documents found in target")
}
对我所缺少的任何见解都会有所帮助。我尝试将输出格式更改为 MongoUpdateWritable 但这对行为没有影响。我知道这可能是一个配置问题,但它似乎是 mongo hadoop 适配器的一个错误,因为使用它们的输入和输出格式编写文档并且 MongoUpdateWritable 类确实可以成功地读写文档。
POM 为方便起见:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>spark_mongo_upsert_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>1.6.0-cdh5.7.0</spark.version>
<mongo.version>3.2.0</mongo.version>
<mongo.hadoop.version>2.0.2</mongo.hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.mongodb.</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongo.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Plugin to compile Scala code -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
</plugin>
</plugins>
</build>
</project>