0

我正在构建一个基于CDC的应用程序,该应用程序使用Mongo Change Streams来侦听更改事件并近乎实时地索引 elasticsearch 中的更改。

到目前为止,我已经实现了一个工作程序,它调用一个函数来捕获事件、转换它们并在 elasticsearch 中对它们进行索引,在为 1 个 mongo 集合实现流时没有任何问题:

function syncChangeEvents() {
  const stream = ModelA.watch()
  while (!stream.isClosed()) {
    if (await stream.hasNext()) {
      const event = stream.next()
      // transform event
      // index to elasticsearch
    }
  }
}

我已经使用无限循环(可能是一种不好的方法)实现了它,但我不确定当我必须保持更改流永远存在时有什么替代方案。

当我必须为另一个模型实现更改流时,问题就来了。由于第一个函数有一个阻塞的 while 循环,因此工作人员无法调用第二个函数来启动第二个更改流。

我想知道最好的方法是启动一个可以触发 x no 的工人。在不影响每个更改流的性能的情况下更改流。工作线程是正确的方法吗?

4

1 回答 1

2

在 Node.js 中使用 Change Streams 有三种主要方法。

  1. 您可以使用 EventEmitter 的on()函数监控变更流。

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
     // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
     // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
     changeStream.on('change', (next) => {
         console.log(next);
     });
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    
  2. 您可以使用hasNext()监控变更流。

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // Set a timer that will close the change stream after the given amount of time
     // Function execution will continue because we are not using "await" here
     closeChangeStream(timeInMs, changeStream);
    
     // We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
     // If the change stream is closed, hasNext() will return false so the while loop will exit.
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
     while (await changeStream.hasNext()) {
         console.log(await changeStream.next());
     }
    
  3. 您可以使用 Stream API 监控变更流

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
     changeStream.pipe(
         new stream.Writable({
             objectMode: true,
             write: function (doc, _, cb) {
                 console.log(doc);
                 cb();
             }
         })
     );
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    

如果您的 MongoDB 数据库托管在 Atlas ( https://cloud.mongodb.com ) 上,最简单的做法是创建一个Trigger。Atlas 为您处理变更流代码的编程,因此您只需编写将转换事件并在 Elasticsearch 中为它们编制索引的代码。

有关使用变更流和触发器的更多信息,请参阅我的博客文章GitHub 上提供了上述所有片段的完整代码示例。

于 2020-08-07T12:12:57.817 回答