6

我正在编写一个函数,该函数运行 API 调用并通过偏移量从一个巨大的数据库按顺序请求 JSON。解析 JSON 响应,然后将其中的后续数据上传到我们的 Cloud Firestore 服务器。

Nodejs (Node 6.11.3) 和最新的 Firebase Admin SDK

信息按预期解析,并完美打印到控制台。但是,当数据尝试上传到我们的 Firestore 数据库时,控制台会收到以下错误消息:

身份验证错误:错误:套接字挂起

(节点:846)UnhandledPromiseRejectionWarning:未处理的承诺拒绝(拒绝ID:-Number-):错误:从插件获取元数据失败并出现错误:套接字挂起

偶尔:

验证错误:错误:读取 ECONNRESET

forEach 函数从下载的 JSON 中收集项目并在上传到 Firestore 数据库之前处理数据。每个 JSON 最多有 1000 条数据(价值 1000 个文档)通过 forEach 函数传递。我知道如果函数在上传集完成之前重复,这可能是一个问题?

我是一名编码新手,并且了解此功能的控制流程并不是最好的。但是,我找不到有关控制台打印错误的任何信息。我可以找到大量关于套接字挂断的信息,但在 Auth 错误部分没有。

我使用生成的服务帐户 JSON 作为凭据来访问我们的数据库,该数据库使用 firebase-adminsdk 帐户。我们对数据库的读/写规则目前是开放的,允许任何访问(因为我们正在开发中,没有真正的用户)。

这是我的功能:

Firebase 初始化和偏移归零

 const admin = require('firebase-admin');
    var serviceAccount = require("JSON");
    admin.initializeApp({
    credential: admin.credential.cert(serviceAccount),
    databaseURL: "URL"
    });
    var db = admin.firestore();
    var offset = 0;
    var failed = false;

运行函数并设置 HTTP 标头

var runFunction = function runFunction() {
    var https = require('https');
    var options = {
        host: 'website.com',
        path: (path including an offset and 1000 row specifier),
        method: 'GET',
        json: true,
        headers: {
            'content-type': 'application/json',
            'Authorization': 'Basic ' + new Buffer('username' + ':' + 'password').toString('base64')
        }
    };

如果我们还没有到达 API 响应的末尾,则运行 HTTP 请求并重新运行该函数

if (failed === false) {
        var req = https.request(options, function (res) {
            var body = '';
            res.setEncoding('utf8');
            res.on('data', function (chunk) {
                body += chunk;
            });
            res.on('end', () => {
                console.log('Successfully processed HTTPS response');
                body = JSON.parse(body);
                if (body.hasOwnProperty('errors')) {
                    console.log('Body ->' + body)
                    console.log('API Call failed due to server error')
                    console.log('Function failed at ' + offset)
                    req.end();
                    return
                } else {
                    if (body.hasOwnProperty('result')) {
                        let result = body.result;
                        if (Object.keys(result).length === 0) {
                            console.log('Function has completed');
                            failed = true;
                            return;
                        } else {
                            result.forEach(function (item) {
                                var docRef = db.collection('collection').doc(name);
                                console.log(name);
                                var upload = docRef.set({
                                    thing: data,
                                    thing2: data,
                                })
                            });
                            console.log('Finished offset ' + offset)
                            offset = offset + 1000;
                            failed = false;
                        }
                        if (failed === false) {
                            console.log('Function will repeat with new offset');
                            console.log('offset = ' + offset);
                            req.end();
                            runFunction();
                        } else {
                            console.log('Function will terminate');
                        }
                    }
                }
            });
        });
        req.on('error', (err) => {
            console.log('Error -> ' + err)
            console.log('Function failed at ' + offset)
            console.log('Repeat from the given offset value or diagnose further')
            req.end();
        });
        req.end();
    } else {
        req.end();
    }
    };
    runFunction();

任何帮助将不胜感激!

更新

我刚刚尝试更改一次提取的 JSON 行,然后使用该函数一次上传 - 从 1000 到 100。套接字挂起错误的频率较低,因此肯定是由于数据库过载。

理想情况下,如果每个 forEach 数组迭代在开始之前等待前一次迭代完成,那将是完美的。

更新#2

我已经安装了 async 模块,我目前正在使用 async.eachSeries 函数一次执行一个文档上传。上传过程中的所有错误都会消失 - 但是该功能将花费大量时间才能完成(158,000 个文档大约需要 9 小时)。我更新的循环代码是这样的,实现了一个计数器:

async.eachSeries(result, function (item, callback) {
    // result.forEach(function (item) {
    var docRef = db.collection('collection').doc(name);
    console.log(name);
    var upload = docRef.set({
      thing: data,
      thing2: data,
    }, { merge: true }).then(ref => {
        counter = counter + 1
        if (counter == result.length) {
            console.log('Finished offset ' + offset)
            offset = offset + 1000;
            console.log('Function will repeat with new offset')
            console.log('offset = ' + offset);
            failed = false;
            counter = 0
            req.end();
            runFunction();
        }
        callback()
    });
});

此外,一段时间后数据库返回此错误:

(节点:16168)UnhandledPromiseRejectionWarning:未处理的承诺拒绝(拒绝ID:-Number-):错误:数据存储操作超时,或数据暂时不可用。

似乎现在我的功能花费了太长时间......而不是不够长。有没有人有任何关于如何使这个运行速度更快而不出现错误的建议?

4

3 回答 3

3

作为此循环的一部分的写入请求只是超出了 Firestore 的配额 - 因此服务器拒绝了其中的大多数。

为了解决这个问题,我将我的请求转换为一次上传大约 50 个项目的块,Promises 确认何时移动到下一个块上传。

答案发布在这里 ->在 node.js 中一次遍历一个包含 50 个项目的数组,我的工作代码的模板如下:

async function uploadData(dataArray) {
  try {
    const chunks = chunkArray(dataArray, 50);
    for (const [index, chunk] of chunks.entries()) {
      console.log(` --- Uploading ${index + 1} chunk started ---`);
      await uploadDataChunk(chunk);
      console.log(`---Uploading ${index + 1} chunk finished ---`);
    }
  } catch (error) {
    console.log(error)
    // Catch en error here
  }
}

function uploadDataChunk(chunk) {
  return Promise.all(
    chunk.map((item) => new Promise((resolve, reject) => {
      setTimeout(
        () => {
          console.log(`Chunk item ${item} uploaded`);
          resolve();
        },
        Math.floor(Math.random() * 500)
      );
    }))
  );
}

function chunkArray(array, chunkSize) {
  return Array.from(
    { length: Math.ceil(array.length / chunkSize) },
    (_, index) => array.slice(index * chunkSize, (index + 1) * chunkSize)
  );
}

将数据数组传递给uploadData - 使用uploadData(data); 并将每个项目的上传代码发布到 chunk.map 函数中 setTimeout 块内(在 resolve() 行之前)的 uploadDataChunk 中。

于 2017-10-12T11:13:16.770 回答
0

我通过在循环中链接承诺并在每个承诺之间等待 50 毫秒来解决这个问题。

function Wait() {
    return new Promise(r => setTimeout(r, 50))
}

function writeDataToFirestoreParentPhones(data) {
    let chain = Promise.resolve();
    for (let i = 0; i < data.length; ++i) {
        var docRef = db.collection('parent_phones').doc(data[i].kp_ID_for_Realm);
        chain = chain.then(()=> {
            var setAda = docRef.set({
                parent_id: data[i].kf_ParentID,
                contact_number: data[i].contact_number,
                contact_type: data[i].contact_type
            }).then(ref => {
                console.log(i + ' - Added parent_phones with ID: ', data[i].kp_ID_for_Realm);
            }).catch(function(error) {
                console.error("Error writing document: ", error);
            });
        })
        .then(Wait)
    }
}
于 2018-01-15T19:18:38.070 回答
0

对我来说,这原来是一个网络问题。

以前分批上传 180,000 个文档 10,000 个文件对我来说没有问题,今天使用公共的较慢的 wifi 连接,我收到了该错误。

切换回我的 4G 移动连接为我解决了问题。不确定这是否是速度问题——可能是安全问题——但我会接受这个假设。

于 2018-12-05T08:49:55.410 回答