2

我在 nodejs 中使用 mongoDB 更改流,一切正常,但如果数据库关闭需要超过 10 5 秒才能启动更改流会引发超时错误,这是我的更改流观察器代码

Service.prototype.watcher = function( db ){

let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;

changeStream.on('change', next => {
    resumeToken = next._id;
    console.log('data is ', JSON.stringify(next))
    changeStream.close();
    // console.log('resumeToken is ', JSON.stringify(resumeToken))
    newChangeStream = collection.watch({ resumeAfter : resumeToken });
    newChangeStream.on('change', next => {
        console.log('insert called ', JSON.stringify( next ))
    });
});

但是在数据库端我已经处理了它,即如果数据库关闭或使用此代码重新连接

 this.db.on('reconnected', function () {
    console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
    console.warn('MongoDB disconnected!');
});

但是我无法处理更改流观察器以在数据库关闭时停止它并在重新连接数据库时重新启动它,或者是否有任何其他更好的方法可以做到这一点?

4

2 回答 2

5

您要做的是将watch()调用封装在一个函数中。然后,此函数将在出错时调用自身,以使用先前保存的恢复令牌重新观察集合。您拥有的代码中缺少的是错误处理程序。例如:

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null

run()

function watch_collection(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch({resumeAfter: resume_token})
    .on('change', data => {
      console.log(data)
      resume_token = data._id
    })
    .on('error', err => {
      console.log(new Date() + ' error: ' + err)
      watch_collection(con, coll)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_collection(con, 'test', 'test')
}

请注意,它watch_collection()包含watch()方法及其处理程序。在更改时,它将打印更改并存储恢复令牌。出错时,它会调用自己再次重新观看集合。

于 2019-02-12T02:42:56.973 回答
2

这是我开发的解决方案,只需添加 stream.on(error) 函数,这样它就不会在出现错误时崩溃,因为在重新连接数据库时重新启动流,还将每个事件的恢复令牌保存在文件中,这在应用程序时很有帮助崩溃或停止,您再次运行,在此期间如果添加了 x 条记录,因此在应用程序重新启动时,只需从文件中获取最后一个恢复令牌并从那里启动观察程序,它将获得之后插入的所有记录,因此不会有记录错过了,下面是代码

var rsToken ;
    try {
        rsToken = await this.getResumetoken()
    } catch (error) {
        rsToken = null ;
    }

    if (!rsToken)
        changeStream = collection.watch({ fullDocument: 'updateLookup' });
    else 
        changeStream = collection.watch({ fullDocument: 'updateLookup', resumeAfter : rsToken  });

    changeStream.on('change', next => {

        resumeToken = next._id;
        THIS.saveTokenInfile(resumeToken)

        cs_processor.process( next )


    });  
    changeStream.on('error', err => {
        console.log('changestream error ')
    })
于 2019-02-12T08:47:37.330 回答