1

我正在使用官方节点驱动程序连接到我的 MongoDB。

我正在尝试查看特定集合和特定文档的更改。两种方式都遇到此错误。

我的代码大部分时间都运行良好,但有时在空闲时它会随机使我的整个应用程序崩溃。我想了解为什么会发生这种情况,并抓住错误,这样它就不会破坏我的整个应用程序。不幸的是,它从不指向我代码中的特定行。

这是我的代码之一(简化):

let watcher = q.watch({ids: [documentId]}).on("change", (changeEvent) => { 
    //only listen to update operations
    if (changeEvent.operationType !== 'update') return;
    console.log('document was updated');
    watcher.cursor.close();
});

这给了我这个错误:

node:events:371
      throw er; // Unhandled 'error' event
      ^

MongoServerError: Cursor session id (8c01dfe6-6a3e-479c-98ce-243d61ec24e4 - 6tctnoM1KQxUOgLaEIm7HYAvVSF3YGlrCMxn+SswizE=) is not the same as the operation context's session id (none)
    at MessageStream.messageHandler (C:\project\node_modules\mongodb\lib\cmap\connection.js:463:30)
    at MessageStream.emit (node:events:394:28)
    at processIncomingData (C:\project\node_modules\mongodb\lib\cmap\message_stream.js:108:16)
    at MessageStream._write (C:\project\node_modules\mongodb\lib\cmap\message_stream.js:28:9)
    at writeOrBuffer (node:internal/streams/writable:389:12)
    at _write (node:internal/streams/writable:330:10)
    at MessageStream.Writable.write (node:internal/streams/writable:334:10)
    at TLSSocket.ondata (node:internal/streams/readable:754:22)
    at TLSSocket.emit (node:events:394:28)
    at addChunk (node:internal/streams/readable:315:12)
Emitted 'error' event on ChangeStream instance at:
    at closeWithError (C:\project\node_modules\mongodb\lib\change_stream.js:355:22)
    at processError (C:\project\node_modules\mongodb\lib\change_stream.js:445:12)
    at Readable.<anonymous> (C:\project\node_modules\mongodb\lib\change_stream.js:364:33)
    at Readable.emit (node:events:394:28)
    at emitErrorNT (node:internal/streams/destroy:157:8)
    at emitErrorCloseNT (node:internal/streams/destroy:122:3)
    at processTicksAndRejections (node:internal/process/task_queues:83:21) {
  operationTime: Timestamp { low: 1, high: 1629924381, unsigned: true },
  ok: 0,
  code: 13,
  codeName: 'Unauthorized',
  '$clusterTime': {
    clusterTime: Timestamp { low: 1, high: 1629924381, unsigned: true },
    signature: {
      hash: Binary {
        sub_type: 0,
        buffer: Buffer(20) [Uint8Array] [
          104,  40, 136,  32, 223, 251,
          247,  64,  47, 185, 197, 229,
          247,  96, 219, 103, 110, 116,
           65, 183
        ],
        position: 20
      },
      keyId: Long { low: 5, high: 1629063232, unsigned: false }
    }
  }
}

我还有其他代码,偶尔也会以同样的方式崩溃:

queueCollection.watch([{ $match: { operationType: "insert" }}])
    .on("change", (changeEvent) => { 
        console.debug('document added',currentlyProcessing?'currentlyProcessing':'notCurrentlyProcessing');
        if (!currentlyProcessing) processNextQueueItem();
    });

如果有人能解释为什么会发生这个错误,或者当它发生时如何捕捉和修复观察者,那就太好了。

4

1 回答 1

0

我仍然不知道如何防止它,但这里是如何破解它:

//watches the collection for any inserted documents
function startWatcher (collection) {
    console.log('starting watcher');
    let watcher = collection.watch([{ $match: { operationType: "insert" }}])
        .on("change", (changeEvent) => {
            /*whatever you want to do when event is fired*/
        })
        //changestream connection died (cursor session id is not the same as the operation contexts session id)
        .on('error', e => {
            console.error('watcher died');

            //close current connection
            watcher.cursor.close();

            //restart new watcher
            startWatcher(collection);
        });
}

startWatcher(db.collection('collectionname'));
于 2021-09-17T19:24:29.763 回答