4

我编写了一个小async脚本,将大量 JSON 文件批量插入到 MongoDB 分片集群中。这是我第一次使用这个模块(我还在学习 Node.js)。我不知道我做得对不对。

  • 代码是瀑布 (1) 的最后一部分:之前的函数以具有db,collfiles属性的对象结束。
  • files数组包含数百个文件路径,并且应用于数组每个元素的函数同样是瀑布 (2)。
  • 瀑布(2)由以下部分组成:读取、解析、插入。当这个瀑布结束时 (3) 我调用complete以完成对数组中单个项目的处理,并传递错误(如果有的话)。

到目前为止一切顺利,对吗?

我无法理解的是async.eachLimit回调(4)内部发生了什么。从文档中:

在所有迭代器函数完成或发生错误后调用的回调。

也就是说,当所有函数都完成后,next()调用 (5) 结束脚本。但是根据文档,当发生单个错误时会调用相同的回调 (4)。那是我的脚本在单个文件失败时停止。

我怎样才能避免这种情况?

async.waterfall([ // 1
    // ...
    function (obj, next) {
        async.eachLimit(obj.files, 1000,
            function (file, complete) {
                async.waterfall([ // 2
                    function (next) {
                        fs.readFile(file, {}, function (err, data) {
                            next(err, data);
                        });
                    },
                    function (data, next) { // Parse (assuming all well formed)
                        next(null, JSON.parse(data));
                    },
                    function (doc, next) { // Insert
                        obj.coll.insert(doc, {w: 1}, function (err, doc) {
                            next(err);
                        });
                    }
                ], function (err, result) { // 3
                    complete(err);
                });
            },
            function (err) { // 4
                if (err) console.error(err);
                next(null, obj); // 5
            }
        );
    }
], function (err, obj) { // Waterfall end
    if (err) console.error(err);
    obj.db.close(); // Always close the connection
});
4

1 回答 1

4

如果您不希望它在发生错误时中断,您应该使用虚假的第一个参数调用回调,就像这样(注意 // 3)。你可以吗/我理解正确吗?

async.waterfall([ // 1
    // ...
    function (obj, next) {
        async.eachLimit(obj.files, 1000,
            function (file, complete) {
                async.waterfall([ // 2
                    function (next) {
                        fs.readFile(file, {}, function (err, data) {
                            next(err, data);
                        });
                    },
                    function (data, next) { // Parse (assuming all well formed)
                        next(null, JSON.parse(data));
                    },
                    function (doc, next) { // Insert
                        obj.coll.insert(doc, {w: 1}, function (err, doc) {
                            next(err);
                        });
                    }
                ], function (err, result) { // 3
                    if (err) {
                        console.log(file + ' threw an error');
                        console.log(err);
                        console.log('proceeding with execution');
                    }
                    complete();
                });
            },
            function (err) { // 4
                next(null, obj); // 5
            }
        );
    }
], function (err, obj) { // Waterfall end
    if (err) console.error(err);
    obj.db.close(); // Always close the connection
});
于 2013-05-08T12:43:37.150 回答