0

我正在尝试从 S3 存储桶下载文件并将其拆分为 500000 行的块,每行保存为单独的文件。

我在下面编写的代码不断因此错误而崩溃:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
data-import-engine_1      | 
data-import-engine_1      | <--- Last few GCs --->
data-import-engine_1      | 
data-import-engine_1      | [298:0x5637f5f1d000]   629519 ms: Mark-sweep 1452.6 (1633.7) -> 1452.5 (1596.2) MB, 1100.7 / 0.0 ms  last resort GC in old space requested
data-import-engine_1      | [298:0x5637f5f1d000]   630627 ms: Mark-sweep 1452.5 (1596.2) -> 1452.5 (1591.7) MB, 1107.6 / 0.0 ms  last resort GC in old space requested
data-import-engine_1      | 
data-import-engine_1      | 
data-import-engine_1      | <--- JS stacktrace --->
data-import-engine_1      | 
data-import-engine_1      | ==== JS stack trace =========================================
data-import-engine_1      | 
data-import-engine_1      | Security context: 0xee0238255e9 <JSObject>
data-import-engine_1      |     1: _copyArray [/home/nodeusr/app/node_modules/denque/index.js:~395] [pc=0x33c25bba7a4b](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>,fullCopy=0x20ed2d882371 <true>)
data-import-engine_1      |     2: _growArray [/home/nodeusr/app/node_modules/denque/index.js:416] [bytecode=0x48d2d8f8429 offset=19](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>)
data-import-engine_1      |     3: /* anonymous */ [/home/nodeusr/app/node_modules/ioredis/built...

我在 docker 容器上运行此代码,使用 alpine-node:9 .n 我正在使用 Bull Queue 在沙盒进程中处理这些作业(https://github.com/OptimalBits/bull#documentation

我已经尝试增加 docker 引擎的内存并增加了节点进程的内存限制,但我无法解决这个问题。

const AWS = require('aws-sdk')
const fs = require('fs')
// var LineByLineReader = require('line-by-line')
const es = require('event-stream')

var s3 = new AWS.S3({
  region: process.env.DEFAULT_REGION || 'eu-west-2',
  accessKeyId: process.env.STORAGE_API_KEY || 'somekey',
  secretAccessKey: process.env.STORAGE_API_SECRET || 'somesecret',
  endpoint: (process.env.STORAGE_HOST && process.env.STORAGE_PORT) ? process.env.STORAGE_HOST + ':' + process.env.STORAGE_PORT : 'http://localstack:4572'
})

module.exports = (job) => {
  var dsConfig = job.data.dsConfig
  var totalBytes = 0

  var params = {
    Bucket: process.env.STORAGE_BUCKET_NAME || 'fcd_bucket',
    Key: dsConfig.resourceId
  }

  return new Promise((resolve, reject) => {
    s3.headObject(params, function (err, data) {
      if (err) reject(err)

      totalBytes = data.ContentLength
      var d = job.data
      if (typeof (job.data.progress) === 'undefined') {
        d.progress = 0
      }
      if (typeof (job.data.params) === 'undefined') {
        params.Range = 'bytes=0-' + totalBytes.toString()
        d.params = params
      }
      let progress = d.progress

      var chunkCounter = 0
      var totalCounter = 0
      var rowCounter = 0
      var chunkSize = 500000

      const filewriter = []
      filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '.tmp', {
        flags: 'a', // 'a' means appending (old data will be preserved)
        encoding: 'utf8'
      })
      var outputFiles = ['./tmp/' + d.procId + '.tmp']

      function writeLine (line) {
        if (totalCounter > 0) filewriter[chunkCounter].write(line)
        if (rowCounter > chunkSize) {
          rowCounter = 0
          chunkCounter++
          filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '-' + chunkCounter + '.tmp', {
            flags: 'a', // 'a' means appending (old data will be preserved)
            encoding: 'utf8'
          })
          outputFiles.push('./tmp/' + d.procId + '-' + chunkCounter + '.tmp')
        }
        rowCounter++
        totalCounter++
        progress += Buffer.byteLength(line, 'utf-8')
        d.params.Range = 'bytes=' + progress.toString() + '-' + totalBytes.toString()
        d.progress = progress
        job.progress(parseFloat(progress / totalBytes).toFixed(3) * 100)
        // pipeline.resume()
      }

      s3.getObject(params).createReadStream({
        encoding: 'utf8'
      }).pipe(es.split(/(\r?\n)/)).pipe(es.map((line, callback) => {
        callback(null, writeLine(line))
      }))
        .on('error', err => {
          job.update(d).then(() => {
            console.log('Error occurred during Job ' + job.id + ' execution, progress data stored so to restart from the same point')
          }).catch(err => {
            console.log(err)
          })
          reject(err)
        })
        .on('end', () => {
          filewriter.forEach(writer => {
            writer.end()
          })
          d.tempFiles = outputFiles
          job.update(d).then(() => {
            resolve()
          }).catch(err => {
            reject(err)
          })
        })
    })
  })
}

你对如何解决这个问题有什么建议吗?

谢谢,齐射

4

1 回答 1

-1

将 CloudFront 与对象的部分请求一起使用:

更多详细信息:https ://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/RangeGETs.html

于 2020-03-06T22:16:20.427 回答