8

嗨,我正在使用 16 个集合插入大约 3-4 百万个 json 对象,每个对象从 5-10k 不等。我使用存储过程来插入这些文档。我有 22 个容量单元。

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length;
    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
    }

    // Call the CRUD API to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted. 
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called docs.length times.
    //    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreateOrUpdate(doc, callback) {
        var isAccepted = true;
        var isFound = collection.queryDocuments(collectionLink, 'SELECT * FROM root r WHERE r.id = "' + doc.id + '"', function (err, feed, options) {
            if (err) throw err;
            if (!feed || !feed.length) {
                isAccepted = collection.createDocument(collectionLink, doc, callback);
            }
            else {
                // The metadata document.
                var existingDoc = feed[0];
                isAccepted = collection.replaceDocument(existingDoc._self, doc, callback);
            }
        });

        // If the request was accepted, callback will be called.
        // Otherwise report current count back to the client, 
        // which will call the script again with remaining set of docs.
        // This condition will happen when this stored procedure has been running too long
        // and is about to get cancelled by the server. This will allow the calling client
        // to resume this batch from the point we got to before isAccepted was set to false
        if (!isFound && !isAccepted) getContext().getResponse().setBody(count);
    }

    // This is called when collection.createDocument is done and the document has been persisted.
    function callback(err, doc, options) {
        if (err) throw err;

        // One more document has been inserted, increment the count.
        count++;

        if (count >= docsLength) {
            // If we have created all documents, we are done. Just set the response.
            getContext().getResponse().setBody(count);
        } else {
            // Create next document.
            tryCreateOrUpdate(docs[count], callback);
        }
    }

我的 C# 代码看起来像这样

    public async Task<int> Add(List<JobDTO> entities)
            {

                    int currentCount = 0;
                    int documentCount = entities.Count;

                    while(currentCount < documentCount)
                    {
                        string argsJson = JsonConvert.SerializeObject(entities.Skip(currentCount).ToArray());
                        var args = new dynamic[] { JsonConvert.DeserializeObject<dynamic[]>(argsJson) };

                        // 6. execute the batch.
                        StoredProcedureResponse<int> scriptResult = await DocumentDBRepository.Client.ExecuteStoredProcedureAsync<int>(sproc.SelfLink, args);

                        // 7. Prepare for next batch.
                        int currentlyInserted = scriptResult.Response;

                        currentCount += currentlyInserted;

                    }

                    return currentCount;
            }

我面临的问题是在我尝试插入的 400k 文档中,有时文档会丢失而不会出现任何错误。

该应用程序是部署在云上的工作者角色。如果我增加在 documentDB 中插入的线程或实例的数量,那么丢失的文档数量会高得多。

如何找出问题所在。在此先感谢。

4

3 回答 3

10

我发现在尝试此代码时,我会在 docs.length 处收到一个错误,指出长度未定义。

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length; // length is undefined
}

经过多次测试(在 Azure 文档中找不到任何内容),我意识到我无法按照建议传递数组。参数必须是一个对象。我必须像这样修改批处理代码才能运行。

我还发现我也不能简单地尝试在 DocumentDB 脚本资源管理器(输入框)中传递一组文档。即使占位符帮助文本说你可以。

这段代码对我有用:

// psuedo object for reference only
docObject = {
  "items": [{doc}, {doc}, {doc}]
}

function bulkImport(docObject) {
    var context = getContext();
    var collection = context.getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    // Check input
    if (!docObject.items || !docObject.items.length) throw new Error("invalid document input parameter or undefined.");
    var docs = docObject.items;
    var docsLength = docs.length;
    if (docsLength == 0) {
        context.getResponse().setBody(0);
    }

    // Call the funct to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Obviously I have truncated this function. The above code should help you understand what has to change.
}

如果我错过了 Azure 文档,希望它能够赶上或变得更容易找到。

我还将为 Script Explorer 提交错误报告,希望 Azurites 能够更新。

于 2016-02-19T19:00:29.873 回答
4

需要注意的是,存储过程有界执行,其中所有操作都必须在服务器指定的请求超时时间内完成。如果操作未在该时间限制内完成,则事务将自动回滚。为了简化开发以处理时间限制,所有 CRUD(创建、读取、更新和删除)操作都返回一个布尔值,表示该操作是否会完成。该布尔值可用作结束执行的信号,并用于实现基于延续的模型以恢复执行(这在下面的代码示例中进行了说明)。

上面提供的批量插入存储过程通过返回成功创建的文档数来实现延续模型。这在存储过程的注释中注明:

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client, 
    // which will call the script again with remaining set of docs.
    // This condition will happen when this stored procedure has been running too long
    // and is about to get cancelled by the server. This will allow the calling client
    // to resume this batch from the point we got to before isAccepted was set to false
    if (!isFound && !isAccepted) getContext().getResponse().setBody(count);

如果输出文档计数小于输入文档计数,您将需要使用剩余的文档集重新运行存储过程。

于 2015-11-04T22:45:26.643 回答
2

自 2018 年 5 月以来,Cosmos DB 有一个新的 Batch SDK。有一个GitHub 存储库可以帮助您入门。

我已经能够在 9 秒内导入 100.000 条记录。并且使用 Azure Batch 扇出插入,我在 1 分 15 秒内完成了 1900 万条记录。这是一个 166 万 RU/s 的集合,您显然可以在导入后按比例缩小。

于 2018-07-21T01:37:08.850 回答