我正在尝试使用 GCF 处理 .csv 文件。该文件从另一个存储桶转移到这个存储桶中,并且我尝试使用事件(最终确定)读取文件。(我还需要在这个文件上格式化数据结构,这就是为什么我需要首先读取缓冲区)
在我的本地环境中手动定义文件时,代码运行良好,但是当我切换到使用事件在 GCF 中运行它时,它没有错误地退出。只需忽略下面代码中的调试日志即可。(它停在“console.log(“发现错误”)”上,所以我认为,读取流永远不会正确创建。)
继承人的代码:
const { Storage } = require('@google-cloud/storage');
const Papa = require('papaparse');
const moment = require('moment');
const { BigQuery } = require('@google-cloud/bigquery');
const storage = new Storage();
const bigquery = new BigQuery();
const dataset = bigquery.dataset('toy_test');
const bucket = storage.bucket('ga_report');
exports.readCSVContent = async (event, context) => {
const gcsEvent = event;
const fileName = gcsEvent.id;
console.log(fileName);
console.log(typeof fileName);
if (
fileName.startsWith('processed/') ||
fileName.startsWith('unprocessed/')
) {
console.log('1');
return;
} else {
return await new Promise((resolve, reject) => {
let fileContents = new Buffer('');
console.log('2');
try {
bucket
.file(fileName)
.createReadStream({
start: 10000,
end: 20000
})
.on('error', function (err) {
console.log('error found');
reject('The Storage API returned an error: ' + err);
})
.on('data', function (chunk) {
console.log('buffer');
fileContents = Buffer.concat([fileContents, chunk]);
})
.on('end', async function () {
console.log('end');
let content = fileContents.toString('utf8');
try {
await bucket.file('unprocessed/ ' + gcsEvent.id).save(content);
} catch (error) {
console.log(error);
}
console.log('3');
const parsedCSV = Papa.parse(content, {
transform: function (element, i) {
if (i == 0 && element.length == 8) {
var year = element.substring(0, 4);
var month = element.substring(4, 6);
var day = element.substring(6, 8);
console.log('4');
const date = moment(
year.toString() + month.toString() + day.toString()
).format('YYYY-MM-DDTHH:MM:SS');
console.log('5');
return date;
} else {
return element;
}
},
newline: '\n',
skipEmptyLines: true,
});
let parsedData = parsedCSV.data;
console.log('6');
parsedData.splice(0, 6);
parsedData.reverse();
parsedData.splice(0, 6);
parsedData.reverse();
console.log(parsedData);
const jsonData = parsedData.map((value, index) => {
return {
date: value[0],
car_model: value[1],
configurations: value[2],
};
});
try {
await dataset.table('my_table').insert(jsonData);
console.log(`Inserted ${jsonData.length} rows`);
} catch (error) {
console.log(error);
}
const finalCSV = Papa.unparse(parsedData);
const currentDateTime = moment().format('YYYY-MM-DD HH:mm:ss');
try {
await bucket
.file(
'processed/' +
currentDateTime +
' configurations' +
'.csv'
)
.save(finalCSV);
console.log(
gcsEvent.id + 'in' + 'processed/'
);
} catch (error) {
console.log(error);
}
const [files] = await bucket.getFiles();
files.forEach((element) => {
console.log(element.name);
});
});
} catch (error) {
console.log(error);
}
});
}
};