具体发生的情况是,在与 MongoDB 的连接期间,changeStream
如果我刷新应用程序,然后在它通过连接触发时更新数据,则数据将被复制,并且它会增加我刷新的次数。因此,如果我刷新浏览器 5 次,那么在数据更新时通过管道传输的数据将是 5 组通过流传输的相同数据。
从表面上看,我认为这不是 MongoDB 的changeStream
做法,但我不是 100% 确定。当我自己使用changeStream
它时,没有 SSE 连接,这不会发生。无论我刷新浏览器多少次,都会按预期进行一对一的更改。
我认为可以缓解这个问题的是,在关闭浏览器或刷新它会触发连接关闭事件时,我可以结束 SSE 连接并重新启动它或刷新它。但是,当我从客户端关闭 SSE 连接时,它不会在刷新重置后重新建立。我相信关闭连接会清除流并重新开始,但我不确定。
架构如下。有一个路由设置为服务器发送事件 SSE,它将端点连接到 Angular 客户端。
exports.GetCovid19InitialUserStream = (req, res) => {
initialize(req, res);
const initialize = (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // flush the headers to establish SSE with client
res.write("retry: 5000\n\n");
client.connect( err => {
const collection = client.db(dbName);
testthis(collection, res);
next: (v) => {
console.log(`observerA: ${JSON.stringify(v)}`);
res.write(`data: ${JSON.stringify(v)}\n\n`);
// res.end();
// return;
complete: () => console.log('COMPLETE ')
res.on('close', () => {
console.log('client dropped me');
// clearInterval(interValID);
// res.end();
const subject = new Subject<any>();
const testthis = async (client, res) => {
const collection = client.collection(documentName);
const changeStreamIterator = collection.watch({ fullDocument: 'updateLookup' });
changeStreamIterator.on('change', next => {
console.log('THIS IS NEXT ============================== ', next.fullDocument);
// res.write(`data: ${JSON.stringify(next.fullDocument)}\n\n`); // res.write() instead of res.send()
// return next.fullDocument;
// process next document
// while(true) {
// const next = await changeStreamIterator.next();
// subject.next(next);
// }
getServerSentEvent(url?: string): Observable<any> {
return Observable.create(observer => {
if (this.isBrowser) {
this._zone.runOutsideAngular(() => {
const eventSource: EventSource = this.getEventSource(url);
eventSource.onmessage = event => {
console.log('event ', event);
this._zone.run(() => {
// console.log('event.lastEventId ', event.lastEventId);
// if (event.lastEventId === '-1') {
// console.log('I should CLOSE --------- ', event.lastEventId);
// }
// eventSource.close();
eventSource.onerror = error => {
console.log('this is the error ', error);
this._zone.run(() => {
// observer.error(error);
// if I close then there can be no more messages sent
// eventSource.close();
private getEventSource(url: string): EventSource {
if (this.isBrowser) {
console.log('this happened here 2');
return new EventSource(url);
我不知道这是否很重要,或者我是否应该忽略它,因为它发生的可能性很小,但我觉得设置有问题,我需要关闭 SSE 连接并刷新或重新启动它或者是其他东西。