0

我不知道这是否是一个严重的问题,因为该应用程序不是为人们在进行数据更新时不断刷新应用程序而设计的。但它仍然感觉像代码气味,似乎是明显的内存泄漏。

具体发生的情况是,在与 MongoDB 的连接期间,changeStream如果我刷新应用程序,然后在它通过连接触发时更新数据,则数据将被复制,并且它会增加我刷新的次数。因此,如果我刷新浏览器 5 次,那么在数据更新时通过管道传输的数据将是 5 组通过流传输的相同数据。

从表面上看,我认为这不是 MongoDB 的changeStream做法,但我不是 100% 确定。当我自己使用changeStream它时,没有 SSE 连接,这不会发生。无论我刷新浏览器多少次,都会按预期进行一对一的更改。

我认为可以缓解这个问题的是,在关闭浏览器或刷新它会触发连接关闭事件时,我可以结束 SSE 连接并重新启动它或刷新它。但是,当我从客户端关闭 SSE 连接时,它不会在刷新重置后重新建立。我相信关闭连接会清除流并重新开始,但我不确定。

架构如下。有一个路由设置为服务器发送事件 SSE,它将端点连接到 Angular 客户端。

exports.GetCovid19InitialUserStream = (req, res) => {
    initialize(req, res);
}

const initialize = (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // flush the headers to establish SSE with client
    res.write("retry: 5000\n\n");

    client.connect( err => {
        const collection = client.db(dbName);
        ....
        testthis(collection, res);
    });
    subject.subscribe({
        next: (v) => {
            console.log(`observerA: ${JSON.stringify(v)}`);
            res.write(`data: ${JSON.stringify(v)}\n\n`);
            // res.end();
            // return;
        },
        complete: () => console.log('COMPLETE ')
      });

    res.on('close', () => {
        console.log('client dropped me');
        // clearInterval(interValID);
        // res.end();
    });
}

这是changeStream代码:

const subject = new Subject<any>();
const testthis = async (client, res) => {
    const collection = client.collection(documentName);
    const changeStreamIterator = collection.watch({ fullDocument: 'updateLookup' });

    changeStreamIterator.on('change', next => {
        console.log('THIS IS NEXT ============================== ', next.fullDocument);
        subject.next(next.fullDocument);
        // res.write(`data: ${JSON.stringify(next.fullDocument)}\n\n`); // res.write() instead of res.send()
        // return next.fullDocument;
        // process next document
      });
    // while(true) {
    //     const next = await changeStreamIterator.next();
    //     subject.next(next);
    // }

}

客户端代码如下:

 getServerSentEvent(url?: string): Observable<any> {
    return Observable.create(observer => {
      if (this.isBrowser) {
        this._zone.runOutsideAngular(() => {
          const eventSource: EventSource = this.getEventSource(url);
          eventSource.onmessage = event => {
            console.log('event ', event);
            this._zone.run(() => {
              // console.log('event.lastEventId ', event.lastEventId);
              // if (event.lastEventId === '-1') {
              //   console.log('I should CLOSE --------- ', event.lastEventId);
              // }
              observer.next(event);

            });
          };
          // eventSource.close();
          eventSource.onerror = error => {
            console.log('this is the error ', error);
            this._zone.run(() => {
              // observer.error(error);
              // if I close then there can be no more messages sent
              // eventSource.close();
            });
          };
        })
      }


    });
  }
  private getEventSource(url: string): EventSource {
    if (this.isBrowser) {
      console.log('this happened here 2');
      return new EventSource(url);
    }

  }

我不知道这是否很重要,或者我是否应该忽略它,因为它发生的可能性很小,但我觉得设置有问题,我需要关闭 SSE 连接并刷新或重新启动它或者是其他东西。

4

0 回答 0