我正在使用 MongoDB 3.6 watch()的新功能,以便将数据库更新从节点服务器发送到客户端的 ajax。
我创建了一个通过 ajax 调用定期查询的 web 服务。在两个连续的 ajax 调用之间,我希望第二个获取同时发生的所有更新。我知道我必须恢复更改流,如下面的官方文档所示:https ://docs.mongodb.com/manual/changeStreams/#resume-a-change-stream 。但是我没有找到如何将其应用于我的特定需求,我的意思是,我可以在哪个回调中处理我的数据并将其发送到 web 服务响应?
这是我的服务器端代码的一部分:server.js
const pipeline = [
{
$match : {
"operationType" : "insert"
,
"fullDocument.T" : { "$exists":true}
}
},
{
$project: { "fullDocument.ts": 1,
"fullDocument.T":1}
}
];
function getLiveData(handler){
console.log("in getLiveData");
var liveArray=[];
var resumeToken;
const changeStream = dbObject.collection('status').watch(pipeline);
changeStream.hasNext(function(err, change) {
if (err) return console.log(err);
expect(err).to.equal(null);
expect(change).to.exist;
console.log("in changeStream.hasNext");
changeStream.next(function(err, change) {
if (err) return console.log(err);
expect(err).to.equal(null);
console.log("in changeStream.next");
resumeToken = change._id;
expect(change._id).to.exist;
expect(changeStream.resumeToken).to.exist;
changeStream.close(function(err) {
if (err) return console.log(err);
expect(err).to.equal(null);
console.log("in changeStream.close");
const newChangeStream = dbObject.collection('status').watch({ resumeAfter: resumeToken });
newChangeStream.next(function(err, next) {
if (err) return console.log(err);
expect(err).to.equal(null);
expect(next).to.exist;
console.log("in newChangeStream.next");
//my own code
newChangeStream.on("change", function(change) {
console.log('in change stream, change : ',change);
liveArray.push([change.fullDocument.ts, change.fullDocument.T]);
var response = {
"liveArray" : liveArray
};
console.log("from getLiveData : " , response);
handler(response);
});
//my own code
// Since changeStream has an implicit seession,
// we need to close the changeStream for unit testing purposes
newChangeStream.close();
});
});
});
});
}
网络服务部分:
app.get("/liveDataRequest", function(req, res){
getLiveData(function(data){
console.log("in handler", data);
res.status(200).send(data);
});
这是控制台日志,正如我们所见,我处理数据的部分永远不会被调用:
in getLiveData
in changeStream.hasNext
in changeStream.next
in changeStream.close
in newChangeStream.next
in getLiveData
in changeStream.hasNext
in changeStream.next
in changeStream.close
in newChangeStream.next