1

我已经使用 node-oracledb 几个月了,到目前为止我已经成功地实现了我所需要的。

我目前正在开发一个搜索应用程序,它可能会从一次调用中返回大约 200 万行数据。为了确保我不会从浏览器和服务器断开连接,我想我会尝试 queryStream 以便有一个持续的数据流返回客户端。

我按原样实现了queryStream示例,这对几十万行运行良好。但是,当返回的行数大于一百万时,Node 会耗尽内存。通过记录和观察客户端和服务器日志事件,我可以看到客户端在发送和接收的行方面落后于服务器。因此,看起来 Node 正在崩溃,因为它正在缓冲大量数据。

值得注意的是,此时,我的 selectstream 实现位于通过 Express 调用的 req/res 函数中。

要返回数据,我会这样做......

stream.on('data', function (data) {

    rowcount++;

    let obj = new myObjectConstructor(data);
    res.write(JSON.stringify(obj.getJson());

});

我一直在阅读有关流和管道如何帮助处理流的信息,所以我想做的是能够将查询的结果通过管道传输到 a) 流的帮助和 b) 能够在将结果发送回客户端之前,将结果传递给其他函数。

例如

function getData(req, res){

    var stream = myQueryStream(connection, query);

    stream
        .pipe(toSomeOtherFunction)
        .pipe(yetAnotherFunction)
        .pipe(res);

}

我花了几个小时试图找到允许我管道结果的解决方案或示例,但我被困住了,需要一些帮助。

如果我遗漏了一些明显的东西,我深表歉意,但我仍在掌握 Node 尤其是流。

提前致谢。

4

2 回答 2

1

这里有一点阻抗不匹配。queryStream API 发出多行 JavaScript 对象,但您想要流式传输到客户端的是 JSON 数组。您基本上必须在开头添加一个左括号,在每行之后添加一个逗号,并在末尾添加一个右括号。

我将向您展示如何在直接使用驱动程序的控制器中执行此操作,而不是像我在本系列中提倡的那样使用单独的数据库模块。

const oracledb = require('oracledb');

async function get(req, res, next) {
  try {
    const conn = await oracledb.getConnection();

    const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});

    res.writeHead(200, {'Content-Type': 'application/json'});

    res.write('[');

    stream.on('data', (row) => {
      res.write(JSON.stringify(row));
      res.write(',');
    });

    stream.on('end', () => {
      res.end(']');
    });

    stream.on('close', async () => {
      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });

    stream.on('error', async (err) => {
      next(err);

      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });
  } catch (err) {
    next(err);
  }
}

module.exports.get = get;

一旦你掌握了这些概念,你就可以使用一个可重用的 Transform 类来简化一些事情,它允许你在控制器逻辑中使用管道:

const oracledb = require('oracledb');
const { Transform } = require('stream');

class ToJSONArray extends Transform {
  constructor() {
    super({objectMode: true});

    this.push('[');
  }

  _transform (row, encoding, callback) {
    if (this._prevRow) {
      this.push(JSON.stringify(this._prevRow));
      this.push(',');
    }

    this._prevRow = row;

    callback(null);
  }

  _flush (done) {
    if (this._prevRow) {
      this.push(JSON.stringify(this._prevRow));
    }

    this.push(']');

    delete this._prevRow;

    done();
  }
}

async function get(req, res, next) {
  try {
    const toJSONArray = new ToJSONArray();
    const conn = await oracledb.getConnection();

    const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});

    res.writeHead(200, {'Content-Type': 'application/json'});

    stream.pipe(toJSONArray).pipe(res);

    stream.on('close', async () => {
      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });

    stream.on('error', async (err) => {
      next(err);

      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });
  } catch (err) {
    next(err);
  }
}

module.exports.get = get;
于 2018-05-17T21:44:26.337 回答
0

无需编写自己的逻辑来创建 JSON 流,您可以使用JSONStream将对象流转换为(字符串化)JSON,然后再将其通过管道传输到目标(resprocess.stdout),这样就无需处理.on('data',...)事件。

在下面的示例中,我使用了来自节点流模块的管道而不是.pipe方法:效果相似(我认为具有更好的错误处理)。要从中获取对象oracledb.queryStream,您可以指定选项 {outFormat: oracledb.OUT_FORMAT_OBJECT}( docs )。然后,您可以对生成的对象流进行任意修改。这可以使用转换流来完成,可能使用through2-map,或者如果您需要删除或拆分行,使用 through2。下面的流process.stdout在被字符串化为 JSON 后被发送到,但您同样可以将 express 的res.

require('dotenv').config()   // config from .env file
const JSONStream = require('JSONStream')
const oracledb = require('oracledb')
const { pipeline } = require('stream')  
const map = require('through2-map')   // see https://www.npmjs.com/package/through2-map

oracledb.getConnection({
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  connectString: process.env.CONNECT_STRING
}).then(connection => {
  pipeline(
    connection.queryStream(`
      select dual.*,'test' as col1 from dual 
      union select dual.*, :someboundvalue as col1 from dual 
      `
      ,{"someboundvalue":"test5"} // binds
      ,{
        prefetchRows: 150, // for tuning
        fetchArraySize: 150, // for tuning
        outFormat: oracledb.OUT_FORMAT_OBJECT
      }
    )
    ,map.obj((row,index) => {
      row.arbitraryModification = index 
      return row
    })
    ,JSONStream.stringify() // false gives ndjson
    ,process.stdout     // or send to express's res
    ,(err) => { if(err) console.error(err) }
  )
})

// [
// {"DUMMY":"X","COL1":"test","arbitraryModification":0}
// ,
// {"DUMMY":"X","COL1":"test5","arbitraryModification":1}
// ]

于 2021-08-13T18:51:44.217 回答