0

使用响应式 mongo 模板,我试图只为插入操作收听 6 个集合的 mongo 更改流。我开始收听更改流的代码如下:

更改流启动:

//Loop for 6 collections
reactiveMongoTemplate.changeStream(collectionName, changeStreamOptions, MongoChangeStreamEvent.class)
          .doOnNext(changeStreamEvent -> {          
            System.out.println("In doOnNext: Received a new change stream event ");         
          })
          .map(changeStreamEvent -> {    
            //save resume token
          })
          .onErrorResume(throwable -> {
            System.out.println("In onErrorResume of change stream event :: " + throwable.getMessage());
            return null;
          })
          .subscribe();

在每个新条目上,我将简历令牌保存在我的一个集合中:

//Document
public class MongoChangeStreamEvent implements Serializable {
  @Id
  private String id;
  private String resumeToken;
  //other fields and getters and setters
}

使用变更流选项进行启动。如果集合中存在恢复令牌,请使用它。否则,立即恢复。


ChangeStreamOptions changeStreamOptions;
    final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting 
                                     //MongoChangeStreamEvent collection in descending order based on 
                                     //resumeToken

    if (!resumeToken.isEmpty()) {
      changeStreamOptions = ChangeStreamOptions.builder()
          .filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
          .resumeAfter(resumeToken)
          .build();
    } else {
      changeStreamOptions = ChangeStreamOptions.builder()
          .filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))        
          .resumeAt(Instant.now())
          .build();
    }

抛出的错误:

Suppressed: com.mongodb.MongoQueryException: Query failed with error code 280 and error message 
'cannot resume stream; the resume token was not found. {_data: 
"825EAFCA12000000162B022C0100296E5A100440284C0343E64ADEB43522FC0552CC1446645F696400645EAFCA12B51E93000716B9300004"}'

到目前为止,我对这个功能的工作有不同的体验。在没有现有恢复令牌的情况下启动的应用程序始终按预期工作。当我使用现有的恢复令牌重新启动应用程序时的其他发现如下:

  1. 它有时对所有 6 个都非常有效。
  2. 刚重新启动后,它启动了一些,其余的则失败了。
  3. 重新启动后,启动没有抛出任何错误。但是在正在监视的集合中插入文档时,很少/全部出错。

我了解更改流取决于文档中引用的 oplogs 历史记录。更令我惊讶的是,出错的简历令牌与我现有的任何简历令牌都不匹配,并且也不存在于 oplogs 中。

我已经验证提交给响应式 mongo 模板的简历令牌始终是正确的。

  1. 如果我遗漏了什么,请告诉我。
  2. 另外,我很想知道如何处理运行的许多更改流中的一个/几个失败。
4

1 回答 1

0
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting 
                                 //MongoChangeStreamEvent collection in descending order based on 
                                 //resumeToken

这个逻辑基于什么文档?

在当前的驱动程序中,驱动程序应该具有将更改流的恢复令牌传递给应用程序的规定。此简历令牌可以来自多个来源,并且它也可能具有不同的格式。在任何情况下都是一个不透明的标识符,应用程序不应该对恢复令牌或类似的东西进行排序。

例如,您可以查看何时检索到每个恢复令牌并根据该时间戳进行排序,但适用于恢复令牌的唯一操作是将其返回给驱动程序以启动新的更改流。

Ruby 驱动程序示例:https ://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming-a-change-stream

于 2020-05-04T18:22:19.477 回答