我有一个 ETL 查询,它需要我读取大量行,然后对它们应用一些转换并将它们保存回 Postgres 中的单独表中。我正在使用 pg-query-stream 并且我计划在 Bullmq 作业中运行下面的测试功能。
如何测量下面给定流的进度(当前处理的行数/总行数)?
const { Pool } = require('pg');
const JSONStream = require('JSONStream');
const QueryStream = require('pg-query-stream');
function test() {
const pool = new Pool({
database: process.env.POSTGRES_DB,
host: process.env.POSTGRES_HOST,
port: +process.env.POSTGRES_PORT,
password: process.env.POSTGRES_PASSWORD,
ssl: process.env.POSTGRES_SSL === 'true',
user: process.env.POSTGRES_USER,
});
const query = `
SELECT
feed_item_id,
title,
summary,
content
FROM
feed_items
ORDER BY
pubdate DESC,
feed_item_id
LIMIT 50
`;
pool.connect((err, client, done) => {
if (err) throw err;
const queryStream = new QueryStream(query, [], {
batchSize: 200,
});
const stream = client.query(queryStream);
console.log(stream);
stream.pipe(JSONStream.stringify());
stream.on('error', (error) => {
console.error(error);
done();
});
stream.on('end', () => {
console.log('stream has ended');
done();
});
stream.on('data', async (row) => {
stream.pause();
console.log('data received', row.feed_item_id);
// const progress = index / ???
// Simulate async task
await timeout(10);
stream.resume();
});
});
}
test();