我正在编写一些代码来处理 Node.js 中的 csv 并将其存储在 Oracle 数据库中。到目前为止,一切进展顺利,但由于 csv 中有大量行,我得到“ORA-01000:超出最大打开游标”。我在脚本开始时连接到 Oracle 一次。对于 csv 中的每条记录,我正在执行多个SELECT
s、INSERT
s 和DELETE
s,然后转到下一个条目以使用相同的连接处理所有记录。最后,我关闭了连接。一个想法是我每次都从池中获得一个新连接,但我读到其他帖子说我应该使用一个连接。也许我需要设置一个特殊设置来处理一个连接上的所有这些查询?
脚本有点长,所以我会发布重要部分......如果需要我可以发布更多。使用Q
,csvtojson
和oracledb
.
...
var conn = null;
function connect() {
var deferred = Q.defer();
oracledb.outFormat = oracledb.OBJECT;
oracledb.getConnection(
{
user : 'foo',
password : 'bar',
connectString : 'foo.bar/bar',
},
function(err, c) {
if(err) deferred.reject(new Error(err));
// set global connection
conn = c;
deferred.resolve();
}
);
return deferred.promise;
}
function closeConnection(conn) {
var deferred = Q.defer();
conn.release(function(err){
if(err) deferred.reject(err);
else return deferred.resolve();
});
return deferred.promise;
}
/* Process All Data, Promise Loop */
function process(data) {
return processEntry(data.shift()).then(function(){
console.log('Finished processing entry.');
return data.length > 0 ? process(data) : true;
});
}
/* Process an Entry */
function processEntry(entry) {
var deferred = Q.defer();
var data = {};
entryExists(entry)
.then(function(result) {
if(result) return entryLogicError(null, 'Entry exists. Skipping.');
else return getUserFromReleaseCode(entry);
})
.then(function(result) {
if(typeof result != 'undefined' && result.length > 0) {
data.user = result[0];
return getPanelCode(entry);
}
else return entryLogicError(entry, 'No valid release code.');
})
.then(function(result){
if(typeof result != 'undefined' && result.length > 0) {
return createHeader(result[0].foo, result[0].bar);
}
else return entryLogicError(entry, 'No valid panel code.');
})
... More of the same kind of statements processing the entry ...
.then(function() {
return logEntry(entry);
})
.catch(function(error) { console.log("DATA ERROR: " + error) })
.done(function(){
deferred.resolve();
});
return deferred.promise;
}
function entryLogicError() {
// logs entry to be corrected, return a rejected promise to go to the next entry
}
/* Check if record has been processed */
function entryExists(entry) {
var deferred = Q.defer();
var foo = entry[ENTRY_CONST.FOO];
var bar = entry[ENTRY_CONST.BAR];
conn.execute(
'SELECT * FROM TBL_FOO ' +
'WHERE FOO = :foo AND ' +
'BAR = :bar',
[foo, bar],
function(err, result) {
if(err) deferred.reject(err);
else {
deferred.resolve(result.rows.length > 0);
}
});
return deferred.promise;
}
/* Get User from Release Code */
function getUserFromReleaseCode(entry) {
var deferred = Q.defer();
var foo = entry[ENTRY_CONST.FOO];
conn.execute(
'SELECT * FROM TBL_BAR ' +
'WHERE FOO = :foo',
[foo],
function(err, result) {
if(err) deferred.reject(err);
else {
deferred.resolve(result.rows);
}
});
return deferred.promise;
}
/* Create Header */
function createHeader(foo, bar) {
var deferred = Q.defer();
conn.execute(
'BEGIN INSERT INTO TBL_DR_FOO VALUES (NULL,:foo, :bar,' +
'1,NULL,1,NULL,NULL,NULL,NULL) RETURNING DR_FOO_ID INTO :DR_FOO_ID; COMMIT; END;',
{ foo: foo,
bar: bar,
DR_FOO_ID: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER }
},
function(err, result) {
if(err) deferred.reject(err);
else deferred.resolve(result.outBinds);
});
return deferred.promise;
}
function cleanHistory() {
// statement that deletes records from a certain date using conn.execute(..)
}
/* Main */
connect().then(function(){
var converter = new Converter({ noheader: false });
converter.on('end_parsed', function(data) {
process(data).then(function(){
return cleanHistory();
})
.then(function(){
return closeConnection();
}).done();
});
fs.createReadStream(batch).pipe(converter);
}, function(err){
return console.error(err);
});