0

我正在处理一个政治竞选捐款数据集,该数据集最终成为一个大约 500mb 的 JSON 文件(最初是一个 124mb 的 CSV)。它太大了,无法在 Firebase 网络界面中导入(在 Google Chrome 上的标签崩溃之前尝试)。我尝试手动上传对象,因为它们是由 CSV 制作的(使用 CSVtoJSON 转换器,每一行都变成一个 JSON 对象,然后我会在它们到来时将该对象上传到 Firebase)。

这是我使用的代码。

var firebase = require('firebase');
var Converter = require("csvtojson").Converter;
firebase.initializeApp({
  serviceAccount: "./credentials.json",
  databaseURL: "url went here"
});
var converter = new Converter({
    constructResult:false,
  workerNum:4
});
var db = firebase.database();
var ref = db.ref("/");

var lastindex = 0;
var count = 0;
var section = 0;
var sectionRef;
converter.on("record_parsed",function(resultRow,rawRow,rowIndex){
    if (rowIndex >= 0) {
        sectionRef = ref.child("reports" + section);
        var reportRef = sectionRef.child(resultRow.Report_ID);
        reportRef.set(resultRow);
        console.log("Report uploaded, count at " + count + ", section at " + section);
        count += 1;
        lastindex = rowIndex;
        if (count >= 1000) {
            count = 0;
            section += 1;
        }
        if (section >= 100) {
            console.log("last completed index: " + lastindex);
            process.exit();
        }
    } else {
        console.log("we out of indices");
        process.exit();
    }

});
var readStream=require("fs").createReadStream("./vUPLOAD_MASTER.csv");
readStream.pipe(converter);

但是,这遇到了内存问题,无法完成数据集。尝试分块进行也是不可行的,因为 Firebase 没有显示所有上传的数据,而且我不确定我从哪里停下来。(在 Chrome 中打开 Firebase 数据库时,我会看到数据进入,但最终选项卡会崩溃,并且在重新加载后很多数据都丢失了。)

然后我尝试使用Firebase Streaming Import,但是会引发此错误:

started at 1469471482.77
Traceback (most recent call last):
  File "import.py", line 90, in <module>
    main(argParser.parse_args())
  File "import.py", line 20, in main
    for prefix, event, value in parser:
  File "R:\Python27\lib\site-packages\ijson\common.py", line 65, in parse
    for event, value in basic_events:
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 185, in basic_parse
    for value in parse_value(lexer):
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 127, in parse_value
    raise UnexpectedSymbol(symbol, pos)
ijson.backends.python.UnexpectedSymbol: Unexpected symbol u'\ufeff' at 0

查找最后一行(来自 ijson 的错误),我找到了这个 SO thread,但我只是不确定我应该如何使用它来让 Firebase Streaming Import 工作。

我使用 Vim 从尝试上传的 JSON 文件中删除了字节顺序标记,现在在运行导入器大约一分钟后出现此错误:

Traceback (most recent call last):
  File "import.py", line 90, in <module>
    main(argParser.parse_args())
  File "import.py", line 20, in main
    for prefix, event, value in parser:
  File "R:\Python27\lib\site-packages\ijson\common.py", line 65, in parse
    for event, value in basic_events:
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 185, in basic_parse
    for value in parse_value(lexer):
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 116, in parse_value
    for event in parse_array(lexer):
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 138, in parse_array
    for event in parse_value(lexer, symbol, pos):
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 119, in parse_value
    for event in parse_object(lexer):
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 170, in parse_object
    pos, symbol = next(lexer)
  File "R:\Python27\lib\site-packages\ijson\backends\python.py", line 51, in Lexer
    buf += data
MemoryError

Firebase Streaming Importer 应该能够处理超过 250mb 的文件,而且我相当确定我有足够多的 RAM 来处理这个文件。关于为什么会出现此错误的任何想法?

如果看到我尝试使用 Firebase Streaming Import 上传的实际 JSON 文件会有帮助,这里是.

4

1 回答 1

0

我通过放弃 Firebase Streaming Import 并编写自己的工具来解决这个问题,该工具使用 csvtojson 转换 CSV,然后使用 Firebase Node API 一次上传每个对象。

这是脚本:

var firebase = require("firebase");
firebase.initializeApp({
  serviceAccount: "./credentials.json",
  databaseURL: "https://necir-hackathon.firebaseio.com/"
});

var db = firebase.database();
var ref = db.ref("/reports");
var fs = require('fs');
var Converter = require("csvtojson").Converter;
var header = "Report_ID,Status,CPF_ID,Filing_ID,Report_Type_ID,Report_Type_Description,Amendment,Amendment_Reason,Amendment_To_Report_ID,Amended_By_Report_ID,Filing_Date,Reporting_Period,Report_Year,Beginning_Date,Ending_Date,Beginning_Balance,Receipts,Subtotal,Expenditures,Ending_Balance,Inkinds,Receipts_Unitemized,Receipts_Itemized,Expenditures_Unitemized,Expenditures_Itemized,Inkinds_Unitemized,Inkinds_Itemized,Liabilities,Savings_Total,Report_Month,UI,Reimbursee,Candidate_First_Name,Candidate_Last_Name,Full_Name,Full_Name_Reverse,Bank_Name,District_Code,Office,District,Comm_Name,Report_Candidate_First_Name,Report_Candidate_Last_Name,Report_Office_District,Report_Comm_Name,Report_Bank_Name,Report_Candidate_Address,Report_Candidate_City,Report_Candidate_State,Report_Candidate_Zip,Report_Treasurer_First_Name,Report_Treasurer_Last_Name,Report_Comm_Address,Report_Comm_City,Report_Comm_State,Report_Comm_Zip,Category,Candidate_Clarification,Rec_Count,Exp_Count,Inkind_Count,Liab_Count,R1_Count,CPF9_Count,SV1_Count,Asset_Count,Savings_Account_Count,R1_Item_Count,CPF9_Item_Count,SV1_Item_Count,Filing_Mechanism,Also_Dissolution,Segregated_Account_Type,Municipality_Code,Current_Report_ID,Location,Individual_Or_Organization,Notable_Contributor,Currently_Accessed"
var queue = [];
var count = 0;
var upload_lock = false;
var lineReader = require('readline').createInterface({
  input: fs.createReadStream('test.csv')
});

lineReader.on('line', function (line) {
    var line = line.replace(/'/g, "\\'");
    var csvString = header + '\n' + line;
    var converter = new Converter({});
    converter.fromString(csvString, function(err,result){
        if (err) {
            var errstring = err + "\n";
            fs.appendFile('converter_error_log.txt', errstring, function(err){
                if (err) {
                console.log("Converter: Append Log File Error Below:");
                console.error(err);
                process.exit(1);
            } else {
                console.log("Converter Error Saved");
            }
            });
        } else {
            result[0].Location = "";
            result[0].Individual_Or_Organization = "";
            result[0].Notable_Contributor = "";
            result[0].Currently_Accessed = "";
            var reportRef = ref.child(result[0].Report_ID);
            count += 1;
            reportRef.set(result[0]);
            console.log("Sent #" + count);
      }
    });
});

唯一需要注意的是,虽然脚本可以快速发送所有对象,但 Firebase 显然需要在保存它们时保持连接,因为在发送所有对象后关闭脚本会导致很多对象没有出现在数据库中。(我等了 20 分钟才能确定,但​​可能会更短)

于 2016-07-30T03:11:32.983 回答