我正在尝试通过设置更改流并(使用ws websocket 库)将这些更改转发到连接的客户端来编写一些与当前正在运行的 MongoDB 数据库一起使用的 nodejs 代码。
我按照本指南创建了初始的 mongo changestream 代码,该代码正在运行:
conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock;
const changeStreamCursor = collection.watch(
[{
$match: {
$or: [
{ operationType: "update" },
{ operationType: "insert" }
]
}
}],
{
fullDocument: 'updateLookup'
}
);
pollStream(changeStreamCursor);
out each change as it comes in
function pollStream(cursor) {
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
change = cursor.next();
print(JSON.stringify(change));
}
}
pollStream(cursor);
}
然后我用 mongo 脚本运行这个文件:
~/Mongo/bin/mongo change-stream-socket-server.js
所以,我基本上想使用上面的这段代码,但添加 websockets 并调用,ws.send()
而不是仅仅将更改记录到控制台。
不过,问题是,我似乎无法在 mongo 脚本执行的 javascript 文件中使用“require”。有没有办法让它添加ws依赖或者能够使用require?
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8040 });
conn = new Mongo("mongodb://localhost:27017/");
db = conn.getDB("demo");
collection = db.stock;
const changeStreamCursor = collection.watch(
[{
$match: {
$or: [
{ operationType: "update" },
{ operationType: "insert" }
]
}
}],
{
fullDocument: 'updateLookup'
}
);
console.log('also setting up websockets...')
wss.on('connection', function connection(ws) {
wss.clients.forEach(function each(client) {
pollStream(changeStreamCursor);
function pollStream(cursor) {
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
change = cursor.next();
print(JSON.stringify(change));
client.send(JSON.stringify(change))
}
}
pollStream(cursor);
}
});
});
console.log('server running, waiting for connections...')
当我用 mongo 运行它时,我得到一个错误的输出,“require is not defined”:
MongoDB shell 版本 v3.6.4 连接到:mongodb://127.0.0.1:27017 MongoDB 服务器版本:3.6.4 2018-10-28T21:23:53.515-0400 E QUERY
[thread1] ReferenceError: require is not defined : @change -stream-socket-server.js:2:7 加载失败:change-stream-socket-server.js
如果我尝试仅使用“节点”和文件运行它,则会收到未定义 Mongo 的错误。我尝试使用下面的代码使用“MongoClient”连接到它,但这对我也不起作用。
const MongoClient = require('mongodb').MongoClient;
const assert = require('assert');
const url = 'mongodb://localhost:27017';
const dbName = 'demo';
const client = new MongoClient(url);
client.connect(function(err) {
assert.equal(null, err);
console.log("Connected successfully to server");
const db = client.db('demo');
const changeStreamCursor = db.collection('stock').watch();
pollStream(changeStreamCursor);
function pollStream(cursor) {
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
change = cursor.next();
print(JSON.stringify(change));
}
}
pollStream(cursor);
}
});
上面的代码给了我一个错误的输出,“cursor.isExhausted 不是一个函数”:
(node:4590) DeprecationWarning:当前的 URL 字符串解析器已被弃用,并将在未来的版本中删除。要使用新的解析器,请将选项 { useNewUrlParser: true } 传递给 MongoClient.connect。成功连接到服务器 /Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/operations/mongo_client_ops.js:466 throw err; ^
TypeError:cursor.isExhausted 不是 /Users/jim/Git 的 pollStream (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/basic-mongo-changesream-watcher.js:68:17) 的函数-Projects/MongoDB-Change-Streams-Example/basic-mongo-changesream-watcher.js:47:2 结果(/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/ utils.js:414:17) 在 executeCallback (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/utils.js:406:9) 在错误 (/Users/jim/ Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/operations/mongo_client_ops.js:286:5) 在 connectCallback (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/ mongodb/lib/operations/mongo_client_ops.js:241:5) 在进程中。nextTick (/Users/jim/Git-Projects/MongoDB-Change-Streams-Example/node_modules/mongodb/lib/operations/mongo_client_ops.js:463:7) 在 _combinedTickCallback (internal/process/next_tick.js:131:7)在 process._tickCallback (internal/process/next_tick.js:180:9)
我有什么办法可以同时使用 Mongo 变更流和 ws websocket 库吗?谢谢!