使用响应式 mongo 模板,我试图只为插入操作收听 6 个集合的 mongo 更改流。我开始收听更改流的代码如下:
更改流启动:
//Loop for 6 collections
reactiveMongoTemplate.changeStream(collectionName, changeStreamOptions, MongoChangeStreamEvent.class)
.doOnNext(changeStreamEvent -> {
System.out.println("In doOnNext: Received a new change stream event ");
})
.map(changeStreamEvent -> {
//save resume token
})
.onErrorResume(throwable -> {
System.out.println("In onErrorResume of change stream event :: " + throwable.getMessage());
return null;
})
.subscribe();
在每个新条目上,我将简历令牌保存在我的一个集合中:
//Document
public class MongoChangeStreamEvent implements Serializable {
@Id
private String id;
private String resumeToken;
//other fields and getters and setters
}
使用变更流选项进行启动。如果集合中存在恢复令牌,请使用它。否则,立即恢复。
ChangeStreamOptions changeStreamOptions;
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting
//MongoChangeStreamEvent collection in descending order based on
//resumeToken
if (!resumeToken.isEmpty()) {
changeStreamOptions = ChangeStreamOptions.builder()
.filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
.resumeAfter(resumeToken)
.build();
} else {
changeStreamOptions = ChangeStreamOptions.builder()
.filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
.resumeAt(Instant.now())
.build();
}
抛出的错误:
Suppressed: com.mongodb.MongoQueryException: Query failed with error code 280 and error message
'cannot resume stream; the resume token was not found. {_data:
"825EAFCA12000000162B022C0100296E5A100440284C0343E64ADEB43522FC0552CC1446645F696400645EAFCA12B51E93000716B9300004"}'
到目前为止,我对这个功能的工作有不同的体验。在没有现有恢复令牌的情况下启动的应用程序始终按预期工作。当我使用现有的恢复令牌重新启动应用程序时的其他发现如下:
- 它有时对所有 6 个都非常有效。
- 刚重新启动后,它启动了一些,其余的则失败了。
- 重新启动后,启动没有抛出任何错误。但是在正在监视的集合中插入文档时,很少/全部出错。
我了解更改流取决于文档中引用的 oplogs 历史记录。更令我惊讶的是,出错的简历令牌与我现有的任何简历令牌都不匹配,并且也不存在于 oplogs 中。
我已经验证提交给响应式 mongo 模板的简历令牌始终是正确的。
- 如果我遗漏了什么,请告诉我。
- 另外,我很想知道如何处理运行的许多更改流中的一个/几个失败。