1

我正在尝试通过设置更改流并(使用ws websocket 库)将这些更改转发到连接的客户端来编写一些与当前正在运行的 MongoDB 数据库一起使用的 nodejs 代码。

我按照本指南创建了初始的 mongo changestream 代码,该代码正在运行:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;

const changeStreamCursor = collection.watch(
    [{
        $match: {
            $or: [
                { operationType: "update" },
                { operationType: "insert" }
            ]
        }
    }],
    {
        fullDocument: 'updateLookup'
    }
);

pollStream(changeStreamCursor);
out each change as it comes in
function pollStream(cursor) {
    while (!cursor.isExhausted()) {
        if (cursor.hasNext()) {
            change = cursor.next();
            print(JSON.stringify(change));
        }
    }
    pollStream(cursor);
}

然后我用 mongo 脚本运行这个文件:

~/Mongo/bin/mongo change-stream-socket-server.js

所以,我基本上想使用上面的这段代码,但添加 websockets 并调用,ws.send()而不是仅仅将更改记录到控制台。

不过,问题是,我似乎无法在 mongo 脚本执行的 javascript 文件中使用“require”。有没有办法让它添加ws依赖或者能够使用require?

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8040 });

conn = new Mongo("mongodb://localhost:27017/");
db = conn.getDB("demo");
collection = db.stock;

const changeStreamCursor = collection.watch(
    [{
        $match: {
            $or: [
                { operationType: "update" },
                { operationType: "insert" }
            ]
        }
    }],
    {
        fullDocument: 'updateLookup'
    }
);



console.log('also setting up websockets...')

wss.on('connection', function connection(ws) {

        wss.clients.forEach(function each(client) {

        pollStream(changeStreamCursor);

        function pollStream(cursor) {

            while (!cursor.isExhausted()) {
                if (cursor.hasNext()) {
                    change = cursor.next();
                    print(JSON.stringify(change));
                    client.send(JSON.stringify(change))
                }
            }
            pollStream(cursor);
        }

    });
});

console.log('server running, waiting for connections...')

当我用 mongo 运行它时,我得到一个错误的输出,“require is not defined”:

MongoDB shell 版本 v3.6.4 连接到:mongodb://127.0.0.1:27017 MongoDB 服务器版本:3.6.4 2018-10-28T21:23:53.515-0400 E QUERY
[thread1] ReferenceError: require is not defined : @change -stream-socket-server.js:2:7 加载失败:change-stream-socket-server.js

如果我尝试仅使用“节点”和文件运行它,则会收到未定义 Mongo 的错误。我尝试使用下面的代码使用“MongoClient”连接到它,但这对我也不起作用。

const MongoClient = require('mongodb').MongoClient;
const assert = require('assert');
const url = 'mongodb://localhost:27017';
const dbName = 'demo';
const client = new MongoClient(url);

client.connect(function(err) {
    assert.equal(null, err);
    console.log("Connected successfully to server");

    const db = client.db('demo');

    const changeStreamCursor = db.collection('stock').watch();

    pollStream(changeStreamCursor);

    function pollStream(cursor) {
    while (!cursor.isExhausted()) {
        if (cursor.hasNext()) {
            change = cursor.next();
            print(JSON.stringify(change));
        }
    }
    pollStream(cursor);
}

});

上面的代码给了我一个错误的输出,“cursor.isExhausted 不是一个函数”:

(node:4590) DeprecationWarning:当前的 URL 字符串解析器已被弃用,并将在未来的版本中删除。要使用新的解析器,请将选项 { useNewUrlParser: true } 传递给 MongoClient.connect。成功连接到服务器 /Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/operations/mongo_client_ops.js:466 throw err; ^

TypeError:cursor.isExhausted 不是 /Users/jim/Git 的 pollStream (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/basic-mongo-changesream-watcher.js:68:17) 的函数-Projects/MongoDB-Change-Streams-Example/basic-mongo-changesream-watcher.js:47:2 结果(/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/ utils.js:414:17) 在 executeCallback (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/utils.js:406:9) 在错误 (/Users/jim/ Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/operations/mongo_client_ops.js:286:5) 在 connectCallback (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/ mongodb/lib/operations/mongo_client_ops.js:241:5) 在进程中。nextTick (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/operations/mongo_client_ops.js:463:7) 在 _combinedTickCallback (internal/process/next_tick.js:131:7)在 process._tickCallback (internal/process/next_tick.js:180:9)

我有什么办法可以同时使用 Mongo 变更流和 ws websocket 库吗?谢谢!

4

0 回答 0