0

我有一个 ETL 查询,它需要我读取大量行,然后对它们应用一些转换并将它们保存回 Postgres 中的单独表中。我正在使用 pg-query-stream 并且我计划在 Bullmq 作业中运行下面的测试功能。

如何测量下面给定流的进度(当前处理的行数/总行数)?

const { Pool } = require('pg');
const JSONStream = require('JSONStream');
const QueryStream = require('pg-query-stream');

function test() {
  const pool = new Pool({
    database: process.env.POSTGRES_DB,
    host: process.env.POSTGRES_HOST,
    port: +process.env.POSTGRES_PORT,
    password: process.env.POSTGRES_PASSWORD,
    ssl: process.env.POSTGRES_SSL === 'true',
    user: process.env.POSTGRES_USER,
  });
  const query = `
    SELECT 
        feed_item_id, 
        title, 
        summary, 
        content 
    FROM 
        feed_items 
    ORDER BY 
        pubdate DESC, 
        feed_item_id
    LIMIT 50
    `;

  pool.connect((err, client, done) => {
    if (err) throw err;
    const queryStream = new QueryStream(query, [], {
      batchSize: 200,
    });
    const stream = client.query(queryStream);
    console.log(stream);
    stream.pipe(JSONStream.stringify());
    stream.on('error', (error) => {
      console.error(error);
      done();
    });
    stream.on('end', () => {
      console.log('stream has ended');
      done();
    });
    stream.on('data', async (row) => {
      stream.pause();
      console.log('data received', row.feed_item_id);
      //   const progress = index / ???
      //   Simulate async task
      await timeout(10);
      stream.resume();
    });
  });
}

test();
4

0 回答 0