0

我使用请求模块下载包含 .csv 文件的 zip 文件,然后使用管道通过解压缩和拆分模块读取内容,然后使用 mongoose-object-stream 模块解析并将结果写入 mongodb。
我的代码:

//index.js
var request = require('request');
var bun = require('bun');
var split = require('split');
var unzip = require('./lib/unzip');
var tomongo = require('./lib/tomongo');

var pipeline = bun([ unzip(), split()]);
request.get( "http://someurl/somefile.zip" )
  .pipe( pipeline )
  .pipe( tomongo() );

//tomongo.js
var mySchema = require('../schema.json');
var through = require('through2');
var mos = require('mongoose-object-stream');
var mongoose = require('mongoose');
var models = require('../models')

const dbpath = "mongodb://localhost:27017/test";
const mongo = mongoose.connect(dbpath, {useNewUrlParser: true });
mongo.then(() => {
console.log('mongoDB connected');
}).catch((err) => {
console.log('err', err);
});
var db = mongoose.connection;
db.on('error', console.error.bind(console, 'connection error:'));

var modelStream = new mos(models.books);

function parser(){

  var columns = mySchema;

  var parseandwrite = function( chunk, _, cb ){
    var row = {}, cells = chunk.toString('utf-8').split('\t');
    cells.forEach( function( cell, i ){
      row[ columns[ i ] ] = ( cell || '' ).trim();
    });
    if( !!chunk ){
      modelStream.write( row );
    }
    cb();
  };

  return through.obj( parseandwrite );
}

module.exports = parser;

当流结束并且所有记录都存储在数据库中时,我想做一些事情。

我尝试添加到管道 .on('finish', function(){process.exit()}) 或 .on('end', function(){process.exit()}) 但节点继续运行。

4

2 回答 2

0

我做的!through2在.on("end"之前需要.on("data", function(){})...现在进程优雅地断开数据库并退出。

var request = require('request');
var bun = require('bun');
var split = require('split');
var unzip = require('./lib/unzip');
var tomongo = require('./lib/tomongo');

var pipeline = bun([unzip(), split()]);

function streamToDB(url) {
    return new Promise((resolve, reject) => {
        request.get(url)
            .pipe(pipeline)
            .pipe(tomongo())
            .on("data", function(data){
            new aModel( data ).save();}) //here i save to the db
            .on("error", reject)
            .on("end", resolve);
    });
}

mongoose.connect("mongodb://localhost:27017/test", {
    useNewUrlParser: true
}).then(() => {
    console.log('mongoDB connected');
    return streamToDB("http://someurl/somefile.zip")
}).catch((err) => {
    console.log('err', err);
}).then(() => {
    return mongoose.disconnect();
});

//tomongo.js
var parseandwrite = function( chunk, _, cb ){
    var row = {}, cells = chunk.toString('utf-8').split('\t');
    cells.forEach( function( cell, i ){
      row[ columns[ i ] ] = ( cell || '' ).trim();
    });
    if( !!chunk ){
      this.push( row ); //here i push the row to the stream
    }
    cb();
  };

于 2018-08-10T03:43:33.087 回答
0

假设您的parser方法不是这里的问题,我建议将数据库连接逻辑移动到您的索引中,您应该在尝试将数据流式传输到数据库之前连接到数据库。如果将流逻辑包装在一个链中,则可以在一个链Promise中执行数据库连接处理逻辑。Promise

这是一个可能看起来像的示例:

var Promise = require('bluebird');
var mongoose = require('mongoose');
var MongooseObjectStream = require('mongoose-object-stream');
var request = require('request');
var split = require('split');
var through = require('through2');
var unzip = require('unzip-stream');

function streamToDB(url) {
    return new Promise((resolve, reject) => {
        request.get(url)
            .pipe(unzip.Parse())
            .pipe(through.obj(function (entry, enc, cb) {
                if (entry.path === 'file_with_content') {
                    entry.on('end', cb)
                        .on('error', cb)
                        .on('data', (data) => this.push(data));
                } else {
                    entry.autodrain()
                        .on('error', cb)
                        .on('finish', cb);
                }
            }))
            .pipe(split())
            .pipe(through.obj((line, enc, cb) => {
                cb(null, line.split('\t')); // Convert to "real" object here
            }))
            .pipe(new MongooseObjectStream(mongoose, 'Model', {}, { strict: false }))
            .on('error', reject)
            .on('finish', resolve);
    });
}

mongoose.connect('mongodb://localhost:27017/test', {
    useNewUrlParser: true,
    promiseLibrary: Promise
}).then(() => {
    return streamToDB('http://someurl/somefile.zip')
        .finally(() => mongoose.disconnect());
}).catch((err) => {
    console.error(err);
});
于 2018-08-09T22:11:57.057 回答