0

我正在尝试使用 MongoDB Connector for Hadoop with Spark 来查询 MongoDB 中的一个集合,并将检索到的所有文档插入到另一个集合中。MongoUpdateWritable 类用于 RDD 的值以更新 MongoDB 中的集合,它有一个 upsert 标志。不幸的是, upsert 标志似乎对执行没有影响。代码执行时没有错误,就好像 upsert 标志设置为 false 一样。

此(Scala)代码连接到本地主机 mongod 进程,使用 mongo 客户端写入一些数据,然后尝试读取该数据并使用 spark 将其写入同一数据库中的另一个集合。在该写入未通过后,代码通过具有相同 ID 的 mongo 客户端将文档写入目标表并运行相同的 spark 作业,以显示 upsert 的更新部分正常工作。

火花版本:1.6.0-cdh5.7.0

hadoop 版本:2.6.0-cdh5.4.7

蒙戈版本:3.2.0

mongo-hadoop-core 版本:2.0.2

import com.mongodb.client.{FindIterable, MongoCollection, MongoDatabase}
import com.mongodb.{BasicDBObject, DBCollection, MongoClient}
import com.mongodb.hadoop.io.MongoUpdateWritable
import org.apache.hadoop.conf.Configuration
import org.bson.{BSONObject, BasicBSONObject, Document}
import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object sparkTest extends App {

  //setting up mongo
  val mongo: MongoDatabase = new MongoClient("localhost",27017).getDatabase("test")
  var source: MongoCollection[Document] = mongo.getCollection("source")
  val target: MongoCollection[Document] = mongo.getCollection("target")
  source.drop()
  target.drop()
  //inserting document
  val sourceDoc = new Document()
  sourceDoc.put("unchanged","this field should not be changed")
  sourceDoc.put("_id","1")
  source.insertOne(sourceDoc)

  //setting up spark
  val conf = new SparkConf().setAppName("test mongo with spark").setMaster("local")
  val mongoConfig = new Configuration()
  val sc = new SparkContext(conf)
  mongoConfig.set("mongo.input.uri",
    "mongodb://localhost:27017/test.source")
  mongoConfig.set("mongo.output.uri",
    "mongodb://localhost:27017/test.target")

  //setting up read
  val documents = sc.newAPIHadoopRDD(
    mongoConfig,                // Configuration
    classOf[MongoInputFormat],  // InputFormat
    classOf[Object],            // Key type
    classOf[BSONObject])        // Value type

  //building updates with no document matching the query in the target collection
  val upsert_insert_rdd: RDD[(Object, MongoUpdateWritable)] =     documents.mapValues(
    (value: BSONObject) => {

  val query = new BasicBSONObject
  query.append("_id", value.get("_id").toString)

  val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
  update.append("added","this data will be added")

  println("val:"+value.toString)
  println("query:"+query.toString)
  println("update:"+update.toString)

  new MongoUpdateWritable(
  query,  // Query
  update,  // Update
  true,  // Upsert flag
  false,   // Update multiple documents flag
  true  // Replace flag
    )}
  )
  //saving updates
  upsert_insert_rdd.saveAsNewAPIHadoopFile(
    "",
    classOf[Object],
    classOf[MongoUpdateWritable],
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
    mongoConfig)

  // At this point, there should be a new document in the target database, but there is not.
  val count = target.count()
  println("count after insert: "+count+", expected: 1")

  //adding doc to display working update. This code will throw an exception     if there is a
  //document with a matching _id field in the collection, so if this breaks that means the upsert worked!
  val targetDoc = new Document()
  targetDoc.put("overwritten","this field should not be changed")
  targetDoc.put("_id","1")
  target.insertOne(targetDoc)

  //building updates when a document matching the query exists in the target collection
  val upsert_update_rdd: RDD[(Object, MongoUpdateWritable)] = documents.mapValues(
    (value: BSONObject) => {

      val query = new BasicBSONObject
      query.append("_id", value.get("_id").toString)

      val update = new BasicBSONObject(value.asInstanceOf[BasicBSONObject])
      update.append("added","this data will be added")

      println("val:"+value.toString)
      println("query:"+query.toString)
      println("update:"+update.toString)

      new MongoUpdateWritable(
        query,  // Query
        update,  // Update
        true,  // Upsert flag
        false,   // Update multiple documents flag
        true  // Replace flag
      )}
  )
  //saving updates
  upsert_update_rdd.saveAsNewAPIHadoopFile(
    "",
    classOf[Object],
    classOf[MongoUpdateWritable],
    classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
    mongoConfig)

  //checking that the update succeeded. should print:
  //contains new field:true, contains overwritten field:false
  val ret = target.find().first
  if (ret != null)
    println("contains new field:"+ret.containsKey("added")+", contains overwritten field:"+ret.containsKey("overwritten"))
  else
    println("no documents found in target")
}

对我所缺少的任何见解都会有所帮助。我尝试将输出格式更改为 MongoUpdateWritable 但这对行为没有影响。我知道这可能是一个配置问题,但它似乎是 mongo hadoop 适配器的一个错误,因为使用它们的输入和输出格式编写文档并且 MongoUpdateWritable 类确实可以成功地读写文档。

POM 为方便起见:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>spark_mongo_upsert_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>1.6.0-cdh5.7.0</spark.version>
        <mongo.version>3.2.0</mongo.version>
        <mongo.hadoop.version>2.0.2</mongo.hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.mongodb.mongo-hadoop</groupId>
            <artifactId>mongo-hadoop-core</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>${mongo.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- Plugin to compile Scala code -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
            </plugin>
        </plugins>
    </build>

</project>
4

1 回答 1

0

将包含 _id 字段的数据集保存到 MongoDB 将替换和更新任何现有文档。

于 2017-07-20T14:54:14.587 回答