3

我对 MongoDB 比较陌生,但我们考虑将其用作遗留服务前的某种缓存。在这种情况下,我们偶然发现了一些问题。

首先,一些解释。

此缓存服务将位于旧服务和客户端之间。客户端将连接到缓存服务,该服务从旧服务获取数据。缓存服务每 X 分钟获取一次数据,并将它们保存在 MongoDB 中。架构真的很简单:只是一个包含很多键/值的文档。没有嵌套文档等。此外,我们将 _id 设置为来自旧服务的唯一 ID,因此我们也可以控制它。

当缓存服务从旧服务获取数据时,它只得到一个增量(仅自上次获取以来发生变化)。因此,如果自上次以来有 5 个“对象”发生了变化,那么您只会得到这 5 个“对象”(但您会得到完整的对象,而不是对象的增量)。如果任何新的“对象”已添加到旧服务中,那么这些当然也在增量中。

我们的“问题”

在我看来,这听起来像是一个更新。如果有新对象,请插入它们。如果现有对象发生更改,请更新它们。然而,MongoDB 似乎并不特别喜欢多个 upsert。只需插入就会给我一个关于重复键的错误,这是完全可以理解的,因为已经存在具有相同 _id 的文档。可以接受 upsert 参数的更新函数不能接受新对象的列表。在我看来,单个查询是不可能的。不过,我有可能在这里完全忽略了一些东西。

可能的解决方案

有许多不同的解决方案,尤其是我想到的两个:

  • 做两个查询:首先,计算一个包含所有 _id 的列表(请记住,我们从旧服务中获得这些)。然后,使用 $in 函数连同 _id 列表删除它们,并立即插入新文档。这实际上应该用新数据更新我们的集合。它也很容易实现。可能出现的一个问题是客户端在删除和插入之间请求数据,因此错误地得到一个空结果。这是一个交易破坏者,绝对不会发生。
  • 每个更改的对象执行一次更新插入。也很容易实现,并且不应该给出与其他解决方案相同的问题。不过,这还有其他(可能是想象的)问题。它可以在短时间内处理多少个 upsert?它每分钟可以轻松处理 5000 个 upsert 吗?这些不是大文档,只有大约 20 个键/值,没有子文档。这个数字是凭空捏造的,实际数字很难预测。在我看来,这种方法感觉不对。我不明白为什么有必要为每个新文档运行一个查询。

对于两个提议的解决方案和任何其他解决方案,任何帮助都将不胜感激。作为旁注,技术不是真正可讨论的,所以请不要建议其他类型的数据库或语言。我们选择我们选择的东西还有其他强有力的原因:)

4

4 回答 4

1

我会分享我的经验...

在我上一份工作中,我们也遇到过类似的情况。我们最终对每个文档/对象进行了一次查询/写入。我们使用 Mule ESB 将数据从遗留系统泵送到 Mongo,每次写入都是一次 upsert。

表演相当不错,但不是很好。我们可以在几分钟内将数千个文档导入 Mongo。这些文件相当丰富,所以这可能是我们不得不限制对 Mongo 的写入的部分原因。

在我们批量加载数据后,“实时”性能不再是问题。

您建议的第一个选项听起来太复杂,并且可能会使 Mongo 处于未知状态,以防操作在更新中途终止。upsert 选项为我们节省了很多时间,因为我们可以一遍又一遍地重播插入并且是安全的。

于 2013-11-01T16:08:58.133 回答
1

要扩展 ryan1234 的答案:

2.6 版本的 MongoDB 将具备发送批量更新的能力。现在,您需要为每个文件提交单独的请求。

正如 ryan1234 所说,如果您不知道旧版提供者的信息,那么对每个文档进行 upsert 是更新所有现有文档并添加新文档的唯一安全方法。单个 MongoDB 进程可以在中端硬件上轻松处理每秒数千次更新 (1)。如果您没有获得该级别的性能,则可能是客户端和 MongDB 服务器之间的请求延迟。异步 Java 驱动程序可以通过允许同时向服务器发送多个更新请求来帮助克服该限制,同时具有最小的客户端复杂性/线程。

HTH,罗伯

1:我假设文档没有增长,也没有索引更新,但即使有了这些,您也应该能够每秒进行一千次更新。

于 2013-11-01T19:32:37.000 回答
1

或者,如果您的密钥是复合的,您可以使用:

public static BulkWriteResult insertAll(MongoCollection<Document> coll, List<Document> docs, String[] keyTags, boolean upsert) {
    if(docs.isEmpty())
        return null;
    List<UpdateOneModel<Document>> requests = new ArrayList<>(docs.size());
    UpdateOptions opt = new UpdateOptions().upsert(upsert);
    for (Document doc : docs ) {
        BasicDBObject filter = new BasicDBObject();
        for (String keyTag : keyTags) {
            filter.append(keyTag, doc.get(keyTag));
        }
        BasicDBObject action = new BasicDBObject("$set", doc);
        requests.add(new UpdateOneModel<Document>(filter, action, opt));
    }
    return coll.bulkWrite(requests);
}
于 2015-05-29T15:34:35.027 回答
0

我知道。它必须深入挖掘才能找到正确的方法。试试这个: /** * 将文档中的所有项目插入到集合中。* @param coll 目标集合 * @param docs 新的或更新的文档 * @param keyTag 文档中键的名称 * @param upsert 如果为 true,如果未找到则创建新文档 * @return BulkWriteResult 或 null 如果文档。是空的() */

    public static BulkWriteResult insertAll(MongoCollection<Document> coll, List<Document> docs, String keyTag, boolean upsert) {
    if(docs.isEmpty())
        return null;
    List<UpdateOneModel<Document>> requests = new ArrayList<>(docs.size());
    UpdateOptions opt = new UpdateOptions().upsert(upsert);
    for (Document doc : docs ) {
        BasicDBObject filter = new BasicDBObject(keyTag, doc.get(keyTag)); 
        BasicDBObject action = new BasicDBObject("$set", doc);
        requests.add(new UpdateOneModel<Document>(filter, action, opt));
    }
    return coll.bulkWrite(requests);
}
于 2015-05-29T15:28:01.453 回答