2

我正在编写一些代码来处理 Node.js 中的 csv 并将其存储在 Oracle 数据库中。到目前为止,一切进展顺利,但由于 csv 中有大量行,我得到“ORA-01000:超出最大打开游标”。我在脚本开始时连接到 Oracle 一次。对于 csv 中的每条记录,我正在执行多个SELECTs、INSERTs 和DELETEs,然后转到下一个条目以使用相同的连接处理所有记录。最后,我关闭了连接。一个想法是我每次都从池中获得一个新连接,但我读到其他帖子说我应该使用一个连接。也许我需要设置一个特殊设置来处理一个连接上的所有这些查询?

脚本有点长,所以我会发布重要部分......如果需要我可以发布更多。使用Q,csvtojsonoracledb.

...

var conn = null;

function connect() {
    var deferred = Q.defer();

    oracledb.outFormat = oracledb.OBJECT;

    oracledb.getConnection(
        {
            user                : 'foo',
            password            : 'bar',
            connectString       : 'foo.bar/bar',
        }, 
        function(err, c) {
            if(err) deferred.reject(new Error(err));

            // set global connection
            conn = c;
            deferred.resolve();
        }
    );

    return deferred.promise;
}

function closeConnection(conn) {
    var deferred = Q.defer();

    conn.release(function(err){
        if(err) deferred.reject(err);
        else return deferred.resolve();
    });

    return deferred.promise;
}

/* Process All Data, Promise Loop */
function process(data) {
    return processEntry(data.shift()).then(function(){

            console.log('Finished processing entry.');

            return data.length > 0 ? process(data) : true;
    });
}

/* Process an Entry */
function processEntry(entry) {
    var deferred = Q.defer();

    var data = {};

    entryExists(entry)
        .then(function(result) {
            if(result) return entryLogicError(null, 'Entry exists. Skipping.');
            else return getUserFromReleaseCode(entry);
        })
        .then(function(result) {
            if(typeof result != 'undefined' && result.length > 0) {
                data.user = result[0];
                return getPanelCode(entry);
            }
            else return entryLogicError(entry, 'No valid release code.');
       })
       .then(function(result){
           if(typeof result != 'undefined' && result.length > 0) {
               return createHeader(result[0].foo, result[0].bar);
           }
           else return entryLogicError(entry, 'No valid panel code.');
       })   

     ... More of the same kind of statements processing the entry ...

       .then(function() {
           return logEntry(entry);
       })
       .catch(function(error) { console.log("DATA ERROR: " + error) })
       .done(function(){
           deferred.resolve();
       });


    return deferred.promise;
}

function entryLogicError() {
    // logs entry to be corrected, return a rejected promise to go to the next entry
}

/* Check if record has been processed */
function entryExists(entry) {
    var deferred = Q.defer();

    var foo = entry[ENTRY_CONST.FOO];
    var bar = entry[ENTRY_CONST.BAR];

    conn.execute(
        'SELECT * FROM TBL_FOO ' +
        'WHERE FOO = :foo AND ' +
        'BAR = :bar',
        [foo, bar],
        function(err, result) {
            if(err) deferred.reject(err);
            else {
                deferred.resolve(result.rows.length > 0);
            }
        });

    return deferred.promise;
}

/* Get User from Release Code */
function getUserFromReleaseCode(entry) {
    var deferred = Q.defer();

    var foo = entry[ENTRY_CONST.FOO];

    conn.execute(
        'SELECT * FROM TBL_BAR ' +
        'WHERE FOO = :foo',
        [foo],
        function(err, result) {
            if(err) deferred.reject(err);
            else {
                deferred.resolve(result.rows);
            }
        });

    return deferred.promise;
}


  /* Create Header */
function createHeader(foo, bar) {

    var deferred = Q.defer();
    conn.execute(
        'BEGIN INSERT INTO TBL_DR_FOO VALUES (NULL,:foo, :bar,' +
        '1,NULL,1,NULL,NULL,NULL,NULL) RETURNING DR_FOO_ID INTO :DR_FOO_ID; COMMIT; END;',
        {   foo: foo,
            bar: bar,
            DR_FOO_ID: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER }
        },
        function(err, result) {
            if(err) deferred.reject(err);
            else deferred.resolve(result.outBinds);
        });

    return deferred.promise;
}

function cleanHistory() {
    // statement that deletes records from a certain date using conn.execute(..)
}

/* Main */
connect().then(function(){
    var converter = new Converter({ noheader: false });

    converter.on('end_parsed', function(data) {

        process(data).then(function(){ 
            return cleanHistory();
        })
        .then(function(){
            return closeConnection();
        }).done();

    });

    fs.createReadStream(batch).pipe(converter);


}, function(err){
    return console.error(err);
});
4

0 回答 0