我正在尝试解决 mongodb 更改流错误。我正在关注与...的集合的变化
public Flux<Clone> watchForCloneCollectionChanges() {
// set changestream options to watch for any changes to the clone collection
Criteria criteria = new Criteria();
criteria.orOperator(where("operationType").is("insert"), where("operationType").is("replace"));
ChangeStreamOptions options = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(Clone.class,
Aggregation.match(criteria)
)).returnFullDocumentOnUpdate().build();
// return a flux that watches the changestream and returns the full document
return reactiveMongoTemplate.changeStream("clone", options, Clone.class)
.map(ChangeStreamEvent::getBody)
.doOnError(throwable -> logger.error("Error with the teams changestream event: " + throwable.getMessage(), throwable));
}
使用处理程序
public Mono<ServerResponse> watchClones(ServerRequest request) {
Flux<ServerSentEvent<Clone>> sse = this.cloneWatcher.watchForCloneCollectionChanges()
.map(clone -> ServerSentEvent.<Clone>builder()
.data(clone)
.build());
return ServerResponse.ok().body(BodyInserters.fromServerSentEvents(sse));
和路由器功能
@Bean
public RouterFunction<ServerResponse> route(CloneHandler cloneHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/clones"), cloneHandler::getClones)
.andRoute(RequestPredicates.GET("/clones/watch"), cloneHandler::watchClones);
}
对于我的前端,我使用 Angular 使用行为主题和 Eventsource 进行连接。我遇到的问题是在我关闭页面后,即使前端已关闭事件源,changeStream 似乎仍在监视更改。结果,当更新文档时,spring 会引发以下错误:
2020-01-06 15:22:34.312 错误 27772 --- [andler-executor] org.mongodb.driver.operation:回调 onResult 调用产生错误
com.mongodb.MongoException:状态应该是:在 com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) ~[mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.async.client 打开.AbstractSubscription.onError(AbstractSubscription.java:135) ~[mongodb-driver-async-3.11.1.jar:na] at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:93) ~[mongodb -driver-async-3.11.1.jar:na] at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) ~[mongodb-driver-async-3.11.1.jar:na] at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:133) ~[mongodb-driver-core-3.11.1.jar:na] at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:129) ~[mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) [mongodb-driver-core-3.11.1.jar: na] 在 com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:168) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java :159) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) [mongodb-driver-core-3.11.1.jar: na] 在 com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:331) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.operation。AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:310) [mongodb-driver-core-3.11.1.jar:na] at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) [mongodb-driver -core-3.11.1.jar:na] 在 com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:242) [mongodb-driver-core-3.11.1.jar:na] 在 com .mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java: 83) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) [mongodb-driver-core-3.11.1.jar: na] 在 com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult (InternalStreamConnection.java:401) [mongodb-driver-core-3.11.1.jar:na] at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) [mongodb-driver-core- 3.11.1.jar:na] 在 com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb .internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java :514) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) [mongodb-driver-core-3.11.1.jar :na] 在 com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.AsynchronousChannelStream$ BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) [mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.connection.tlschannel。async.AsynchronousTlsChannel$3$1.run(AsynchronousTlsChannel.java:151) [mongodb-driver-core-3.11.1.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na :1.8.0_172] 在 java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_172] 在 java.util.concurrent.FutureTask.run(FutureTask.java) [na :1.8.0_172] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_172] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na :1.8.0_172] 在 java.lang.Thread.run(Thread.java:748) [na:1.8.0_172] 引起:java.lang.IllegalStateException:状态应该是:在 com.mongodb.assertions.Assertions 处打开。 isTrue(Assertions.java:70) ~[mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb.internal.session。BaseClientSessionImpl.advanceOperationTime(BaseClientSessionImpl.java:107) ~[mongodb-driver-core-3.11.1.jar:na] at com.mongodb.internal.session.ClientSessionContext.advanceOperationTime(ClientSessionContext.java:70) ~[mongodb-driver -core-3.11.1.jar:na] 在 com.mongodb.internal.connection.ClusterClockAdvancingSessionContext.advanceOperationTime(ClusterClockAdvancingSessionContext.java:76) ~[mongodb-driver-core-3.11.1.jar:na] 在 com.mongodb .internal.connection.InternalStreamConnection.updateSessionContext(InternalStreamConnection.java:537) ~[mongodb-driver-core-3.11.1.jar:na] at >>com.mongodb.internal.connection.InternalStreamConnection.access$800(InternalStreamConnection.java :76) ~[mongodb-driver-core-3.11.1.jar:na] 在 >>com.mongodb.internal.connection.InternalStreamConnection$2$1。onResult(InternalStreamConnection.java:385) [mongodb-driver-core-3.11.1.jar:na] ...省略了14个常用帧
我搜索了高低,但找不到解决方案。任何帮助,将不胜感激!谢谢!