0

我正在尝试使用mongo-hadoop连接器将数据读入 spark。问题是,如果我试图设置读取数据的限制,我会在 RDD 中获得限制 * 分区数。

mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants");
mongodbConfig.set("mongo.input.limit","3");
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
            mongodbConfig,            // Configuration
            MongoInputFormat.class,   // InputFormat: read from a live cluster.
            Object.class,             // Key class
            BSONObject.class          // Value class
    );

    long count = documents.count();
    System.out.println("Collection Count: " + count);
    System.out.println("Partitions: " + documents.partitions().size());

//9 elements in the RDD = limit * nrOfPartions = 3 * 3
//3 partitions

这种行为对于其他限制是可重现的(我总是得到限制 * 3)。

如果我尝试简单地通过 objectId 查询,我会得到类似的行为(它创建一个具有相同对象 * 分区数的 RDD - 在我的情况下,3 个元素具有相同的文档)。

如果有帮助,我还可以提供用于创建 mongo 集合的脚本。

4

1 回答 1

1

这是一个功能而不是一个错误。mongo.input.limit用于设置limit参数MongoInputSplit因此它应用于逐个分区而不是全局分区。

一般来说,不可能(或者准确地说,实际)限制全局获取记录的数量。每个拆分都是独立处理的,通常没有关于每个拆分产生的记录数量的先验知识。

于 2016-03-27T22:47:25.613 回答