我正在尝试从 S3 存储桶下载文件并将其拆分为 500000 行的块,每行保存为单独的文件。
我在下面编写的代码不断因此错误而崩溃:
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
data-import-engine_1 |
data-import-engine_1 | <--- Last few GCs --->
data-import-engine_1 |
data-import-engine_1 | [298:0x5637f5f1d000] 629519 ms: Mark-sweep 1452.6 (1633.7) -> 1452.5 (1596.2) MB, 1100.7 / 0.0 ms last resort GC in old space requested
data-import-engine_1 | [298:0x5637f5f1d000] 630627 ms: Mark-sweep 1452.5 (1596.2) -> 1452.5 (1591.7) MB, 1107.6 / 0.0 ms last resort GC in old space requested
data-import-engine_1 |
data-import-engine_1 |
data-import-engine_1 | <--- JS stacktrace --->
data-import-engine_1 |
data-import-engine_1 | ==== JS stack trace =========================================
data-import-engine_1 |
data-import-engine_1 | Security context: 0xee0238255e9 <JSObject>
data-import-engine_1 | 1: _copyArray [/home/nodeusr/app/node_modules/denque/index.js:~395] [pc=0x33c25bba7a4b](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>,fullCopy=0x20ed2d882371 <true>)
data-import-engine_1 | 2: _growArray [/home/nodeusr/app/node_modules/denque/index.js:416] [bytecode=0x48d2d8f8429 offset=19](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>)
data-import-engine_1 | 3: /* anonymous */ [/home/nodeusr/app/node_modules/ioredis/built...
我在 docker 容器上运行此代码,使用 alpine-node:9 .n 我正在使用 Bull Queue 在沙盒进程中处理这些作业(https://github.com/OptimalBits/bull#documentation)
我已经尝试增加 docker 引擎的内存并增加了节点进程的内存限制,但我无法解决这个问题。
const AWS = require('aws-sdk')
const fs = require('fs')
// var LineByLineReader = require('line-by-line')
const es = require('event-stream')
var s3 = new AWS.S3({
region: process.env.DEFAULT_REGION || 'eu-west-2',
accessKeyId: process.env.STORAGE_API_KEY || 'somekey',
secretAccessKey: process.env.STORAGE_API_SECRET || 'somesecret',
endpoint: (process.env.STORAGE_HOST && process.env.STORAGE_PORT) ? process.env.STORAGE_HOST + ':' + process.env.STORAGE_PORT : 'http://localstack:4572'
})
module.exports = (job) => {
var dsConfig = job.data.dsConfig
var totalBytes = 0
var params = {
Bucket: process.env.STORAGE_BUCKET_NAME || 'fcd_bucket',
Key: dsConfig.resourceId
}
return new Promise((resolve, reject) => {
s3.headObject(params, function (err, data) {
if (err) reject(err)
totalBytes = data.ContentLength
var d = job.data
if (typeof (job.data.progress) === 'undefined') {
d.progress = 0
}
if (typeof (job.data.params) === 'undefined') {
params.Range = 'bytes=0-' + totalBytes.toString()
d.params = params
}
let progress = d.progress
var chunkCounter = 0
var totalCounter = 0
var rowCounter = 0
var chunkSize = 500000
const filewriter = []
filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '.tmp', {
flags: 'a', // 'a' means appending (old data will be preserved)
encoding: 'utf8'
})
var outputFiles = ['./tmp/' + d.procId + '.tmp']
function writeLine (line) {
if (totalCounter > 0) filewriter[chunkCounter].write(line)
if (rowCounter > chunkSize) {
rowCounter = 0
chunkCounter++
filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '-' + chunkCounter + '.tmp', {
flags: 'a', // 'a' means appending (old data will be preserved)
encoding: 'utf8'
})
outputFiles.push('./tmp/' + d.procId + '-' + chunkCounter + '.tmp')
}
rowCounter++
totalCounter++
progress += Buffer.byteLength(line, 'utf-8')
d.params.Range = 'bytes=' + progress.toString() + '-' + totalBytes.toString()
d.progress = progress
job.progress(parseFloat(progress / totalBytes).toFixed(3) * 100)
// pipeline.resume()
}
s3.getObject(params).createReadStream({
encoding: 'utf8'
}).pipe(es.split(/(\r?\n)/)).pipe(es.map((line, callback) => {
callback(null, writeLine(line))
}))
.on('error', err => {
job.update(d).then(() => {
console.log('Error occurred during Job ' + job.id + ' execution, progress data stored so to restart from the same point')
}).catch(err => {
console.log(err)
})
reject(err)
})
.on('end', () => {
filewriter.forEach(writer => {
writer.end()
})
d.tempFiles = outputFiles
job.update(d).then(() => {
resolve()
}).catch(err => {
reject(err)
})
})
})
})
}
你对如何解决这个问题有什么建议吗?
谢谢,齐射