2

这个简单的示例导致异常

public static void main(String[] args) throws InterruptedException {

    MongoClient mongoClient = MongoClients.create("mongodb://localhost");
    MongoDatabase db = mongoClient.getDatabase("import");
    MongoCollection<Document> gameEntityCollection = db.getCollection("gameEnitites");
    gameEntityCollection.watch().subscribe(new Subscriber<ChangeStreamDocument<Document>>() {

        @Override
        public void onSubscribe(Subscription s) {
            s.request(1);
            new Thread(() -> {
                try {
                    Thread.sleep(100);
                    s.cancel();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        @Override
        public void onNext(ChangeStreamDocument<Document> t) {
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    });

    Thread.sleep(10000);
}

该异常似乎是无害的,并且是异步记录的。

com.mongodb.MongoException: state should be: open
    at com.mongodb.MongoException.fromThrowableNonNull(MongoException.java:79) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.async.client.AbstractSubscription.onError(AbstractSubscription.java:135) ~[mongodb-driver-async-3.11.1.jar:na]
    at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:93) ~[mongodb-driver-async-3.11.1.jar:na]
    at com.mongodb.async.client.MongoIterableSubscription$2.onResult(MongoIterableSubscription.java:85) ~[mongodb-driver-async-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:133) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$3.onResult(AsyncChangeStreamBatchCursor.java:129) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:168) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncChangeStreamBatchCursor$4.onResult(AsyncChangeStreamBatchCursor.java:159) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:331) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:310) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:242) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:83) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:401) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203) ~[mongodb-driver-core-3.11.1.jar:na]
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127) ~[na:na]
    at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:219) ~[na:na]
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: java.lang.IllegalStateException: state should be: open
    at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.session.BaseClientSessionImpl.advanceOperationTime(BaseClientSessionImpl.java:107) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.session.ClientSessionContext.advanceOperationTime(ClientSessionContext.java:70) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.ClusterClockAdvancingSessionContext.advanceOperationTime(ClusterClockAdvancingSessionContext.java:76) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.updateSessionContext(InternalStreamConnection.java:537) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.access$800(InternalStreamConnection.java:76) ~[mongodb-driver-core-3.11.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:385) ~[mongodb-driver-core-3.11.1.jar:na]
    ... 13 common frames omitted

我能做些什么呢?

4

1 回答 1

0

我对 scala 代码有类似的问题,并花了一些时间来调查它。结果,我得出结论,这是驱动程序中的错误。

在 github ( https://github.com/eugeneatnezasa/unclosableMongoDBChangeStream ) 存储库中,您可以找到如何解决此问题的示例,但它在 scala 中。想法是,驱动程序将收集并发送请求的记录数量(s.request(1)在您的情况下为 1),直到此为止cancel。因此,要解决此问题,您需要使用所有请求的记录。很奇怪,但至少你不会得到例外......

于 2020-02-06T08:07:00.200 回答