0

我觉得这里的事情已经不正常了。我想建立与 mySQL 数据库的连接。然后我想读入一个文件。我想逐行获取名称并运行查询。我会假设我的返回承诺的 sqlSelectQuery 函数会等待承诺解决,然后再移动到下一行。我在这里想念什么?

const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')

const connection = mysql.createConnection({
    user: config.user,
    password: config.password,
    database: config.database,
    host: config.host
});

connection.connect((err) => {
    if(err){
      console.log('Error connecting to Db');
      return;
    }
    console.log('Connection established');
  });

fs.createReadStream(path.resolve(__dirname,'data.csv'))
    .pipe(csv.parse({ headers: true }))
    .on('error', error => console.error("error", error))
    .on('data', row => { // need to get this to block
        sqlSelectQuery(row).then(result => console.log("result: ", result))
    })
    .on('end', rowCount => console.log(`Parsed ${rowCount} rows`));




const sqlSelectQuery = (row) => {
    return new Promise((resolve, reject) => {
        console.log("inside promise");
        const selectQuery = 'SELECT * FROM loans where business_name = ?;';
        connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
            let result = {};
            if(err) reject(err);
            if (rows.length === 1){
                let res = rows[0];
                result = {
                    business_name: res.business_name,
                    loan_range: res.loan_range,
                    loan_amount: row.InitialApprovalAmount,
                    count: 1
                };
                resolve(result);
            } else {
                result = {
                    business_name: row.BorrowerName,
                    loan_range: "",
                    loan_amount: "",
                    unique: rows.length
                };
                resolve(result);
            }
        });
    })
}

my console looks like this
inside promise
inside promise  //20 times (I have 20 rows)
Parsed 20 rows
Connection established
result:  {....}
result: {...}....
4

2 回答 2

1

我找到了这个答案。我需要 在 createReadStream 中添加暂停和恢复 nodejs async await

.on('data', async (row) => { // need to get this to block
        stream.pause();
        await sqlSelectQuery(row).then(result => console.log("result: ", result))
        stream.resume();
    })

现在的问题是我的 .on('end') 在最后一行之前运行。

于 2021-03-10T14:21:25.887 回答
0

您可以将每一行添加到 rowsToProcess 数组,然后在读取文件数据后,逐行处理:

const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')

const connection = mysql.createConnection({
    user: config.user,
    password: config.password,
    database: config.database,
    host: config.host
});

connection.connect((err) => {
    if (err) {
        console.error('Error connecting to Db:', err);
        return;
    } 
    console.log('Connection established');
    const rowsToProcess = [];
    fs.createReadStream(path.resolve(__dirname,'data.csv'))
        .pipe(csv.parse({ headers: true }))
        .on('error', error => console.error("error", error))
        .on('data', row => {
            // Add row to process.
            rowsToProcess.push(row);
        })
        .on('end', async rowCount => { 
            await processRows(rowsToProcess);
            console.log("processRows: complete.")
        })
});

async function processRows(rowsToProcess) {
    console.log(`Read ${rowsToProcess.length} row(s) from csv file...`)
    for (let i = 0; i < rowsToProcess.length; i++) {
        console.log(`processing row ${i+1} of ${rowsToProcess.length}...`);
        let result = await sqlSelectQuery(rowsToProcess[i])
        console.log(`row ${i+1} result:`, result);
    }
}

const sqlSelectQuery = (row) => {
    return new Promise((resolve, reject) => {
        console.log("Processing row:", row);
        const selectQuery = 'SELECT * FROM loans where business_name = ?;';
        connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
            let result = {};
            if(err) reject(err);
            if (rows.length === 1){
                let res = rows[0];
                result = {
                    business_name: res.business_name,
                    loan_range: res.loan_range,
                    loan_amount: row.InitialApprovalAmount,
                    count: 1
                };
                resolve(result);
            } else {
                result = {
                    business_name: row.BorrowerName,
                    loan_range: "",
                    loan_amount: "",
                    unique: rows.length
                };
                resolve(result);
            }
        });
    })
}
于 2021-03-10T14:15:36.917 回答