我想处理 10,000,000 行
这是我的代码:
stmt.each('select * from very_big_table',function(err,rows,next){
console.log('This can take 2 seconds'
})
问题是:它会吃掉我所有的内存,还是会一行一行地从硬盘读取?
我想处理 10,000,000 行
这是我的代码:
stmt.each('select * from very_big_table',function(err,rows,next){
console.log('This can take 2 seconds'
})
问题是:它会吃掉我所有的内存,还是会一行一行地从硬盘读取?
我最近刚刚创建了这个帮助类来处理流:
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { Database, Statement } from 'sqlite'
import { Readable } from 'stream'
/**
* Based on: https://gist.github.com/rmela/a3bed669ad6194fb2d9670789541b0c7
*/
export class SqliteReadable<T = any> extends Readable implements ReadableTyped<T> {
constructor(private stmt: Statement) {
super( { objectMode: true } );
// might be unnecessary
// this.on( 'end', () => {
// console.log(`SQLiteStream end`)
// void this.stmt.finalize()
// })
}
static async create<T = any>(db: Database, sql: string): Promise<SqliteReadable<T>> {
const stmt = await db.prepare(sql)
return new SqliteReadable<T>(stmt)
}
/**
* Necessary to call it, otherwise this error might occur on `db.close()`:
* SQLITE_BUSY: unable to close due to unfinalized statements or unfinished backups
*/
async close(): Promise<void> {
await this.stmt.finalize()
}
// count = 0 // use for debugging
override async _read(): Promise<void> {
// console.log(`read ${++this.count}`) // debugging
try {
const r = await this.stmt.get<T>()
this.push(r || null)
} catch(err) {
console.log(err) // todo: check if it's necessary
this.emit('error', err)
}
}
}
两个都。statement.each
如果函数是快速和同步的,则不会使用任何额外的内存,但是对于异步函数,它将以尽可能快的速度启动所有异步处理,因为它可以从磁盘加载数据,这会占用你所有的 RAM。
正如您在此处发布的问题https://github.com/mapbox/node-sqlite3/issues/686中所述,可以使用Statement.get()
. Statement.get()
没有任何参数将获取下一行。因此,您可以像这样实现自己的异步版本:
function asyncEach(db, sql, parameters, eachCb, doneCb) {
let stmt;
let cleanupAndDone = err => {
stmt.finalize(doneCb.bind(null, err));
};
stmt = db.prepare(sql, parameters, err => {
if (err) {
return cleanupAndDone(err);
}
let next = err => {
if (err) {
return cleanupAndDone(err);
}
return stmt.get(recursiveGet);
};
// Setup recursion
let recursiveGet = (err, row) => {
if (err) {
return cleanupAndDone(err);
}
if (!row) {
return cleanupAndDone(null);
}
// Call the each callback which must invoke the next callback
return eachCb(row, next);
}
// Start recursion
stmt.get(recursiveGet);
});
}
请注意,与内置的语法略有不同Statement.each
,错误仅发送到最后一个回调并且不支持可选参数。
另请注意,这比普通Statement.each
函数慢,可以通过发出多个get
调用来改进,以便下一行在next
调用时等待,但要遵循该代码要困难得多。
使用片段的示例:
let rowCount = 0;
asyncEach(db, 'SELECT * from testtable', [], (row, next) => {
assert.isObject(row);
rowCount++;
return next();
}, err => {
if (err) {
return done(err);
}
assert.equal(rowCount, TEST_ROW_COUNT);
done();
});