我正在尝试使用mongo-hadoop连接器将数据读入 spark。问题是,如果我试图设置读取数据的限制,我会在 RDD 中获得限制 * 分区数。
mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants");
mongodbConfig.set("mongo.input.limit","3");
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
long count = documents.count();
System.out.println("Collection Count: " + count);
System.out.println("Partitions: " + documents.partitions().size());
//9 elements in the RDD = limit * nrOfPartions = 3 * 3
//3 partitions
这种行为对于其他限制是可重现的(我总是得到限制 * 3)。
如果我尝试简单地通过 objectId 查询,我会得到类似的行为(它创建一个具有相同对象 * 分区数的 RDD - 在我的情况下,3 个元素具有相同的文档)。
如果有帮助,我还可以提供用于创建 mongo 集合的脚本。