0

我想处理 10,000,000 行

这是我的代码:

stmt.each('select * from very_big_table',function(err,rows,next){
      console.log('This can take 2 seconds'
})

问题是:它会吃掉我所有的内存,还是会一行一行地从硬盘读取?

4

2 回答 2

1

我最近刚刚创建了这个帮助类来处理流:

import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { Database, Statement } from 'sqlite'
import { Readable } from 'stream'

/**
 * Based on: https://gist.github.com/rmela/a3bed669ad6194fb2d9670789541b0c7
 */
export class SqliteReadable<T = any> extends Readable implements ReadableTyped<T> {
  constructor(private stmt: Statement) {
    super( { objectMode: true } );

    // might be unnecessary
    // this.on( 'end', () => {
    //   console.log(`SQLiteStream end`)
    //   void this.stmt.finalize()
    // })
  }

  static async create<T = any>(db: Database, sql: string): Promise<SqliteReadable<T>> {
    const stmt = await db.prepare(sql)
    return new SqliteReadable<T>(stmt)
  }

  /**
   * Necessary to call it, otherwise this error might occur on `db.close()`:
   * SQLITE_BUSY: unable to close due to unfinalized statements or unfinished backups
   */
  async close(): Promise<void> {
    await this.stmt.finalize()
  }

  // count = 0 // use for debugging

  override async _read(): Promise<void> {
    // console.log(`read ${++this.count}`) // debugging
    try {
      const r = await this.stmt.get<T>()
      this.push(r || null)
    } catch(err) {
      console.log(err) // todo: check if it's necessary
      this.emit('error', err)
    }
  }

}

于 2021-08-21T13:15:11.137 回答
0

两个都。statement.each如果函数是快速和同步的,则不会使用任何额外的内存,但是对于异步函数,它将以尽可能快的速度启动所有异步处理,因为它可以从磁盘加载数据,这会占用你所有的 RAM。

正如您在此处发布的问题https://github.com/mapbox/node-sqlite3/issues/686中所述,可以使用Statement.get(). Statement.get()没有任何参数将获取下一行。因此,您可以像这样实现自己的异步版本:

function asyncEach(db, sql, parameters, eachCb, doneCb) {
  let stmt;

  let cleanupAndDone = err => {
    stmt.finalize(doneCb.bind(null, err));
  };

  stmt = db.prepare(sql, parameters, err => {
    if (err) {
      return cleanupAndDone(err);
    }

    let next = err => {
      if (err) {
        return cleanupAndDone(err);
      }

      return stmt.get(recursiveGet);
    };

    // Setup recursion
    let recursiveGet = (err, row) => {
      if (err) {
        return cleanupAndDone(err);
      }

      if (!row) {
        return cleanupAndDone(null);
      }

      // Call the each callback which must invoke the next callback
      return eachCb(row, next);
    }

    // Start recursion
    stmt.get(recursiveGet);
  });
}

请注意,与内置的语法略有不同Statement.each,错误仅发送到最后一个回调并且不支持可选参数。

另请注意,这比普通Statement.each函数慢,可以通过发出多个get调用来改进,以便下一行在next调用时等待,但要遵循该代码要困难得多。

使用片段的示例:

let rowCount = 0;
asyncEach(db, 'SELECT * from testtable', [], (row, next) => {
  assert.isObject(row);
  rowCount++;
  return next();
}, err => {
  if (err) {
    return done(err);
  }
  assert.equal(rowCount, TEST_ROW_COUNT);
  done();
});

于 2016-11-05T21:53:33.637 回答