我遇到了 mongodb(4.2 版)的问题,特别是 Change Stream 功能。
我有一个由 1 个主要、1 个辅助和 1 个仲裁器组成的 ReplicaSet 集群,并且在我的带有 API mongodb-driver-sync 4.2.0-beta1 的 java 代码中,我有一个感兴趣的集合上的 .watch() 进程。如下所示:
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=replica");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collectionStream = database.getCollection("myCollection");
List<Bson> pipeline = Arrays.asList(Aggregates.match(Filters.and(Filters.in("operationType", Arrays.asList("insert", "update", "replace", "invalidate")))));
MongoCursor<ChangeStreamDocument<Document>> cursor = collectionStream.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
ChangeStreamDocument<Document> streamedEvent = cursor.next();
System.out.println("Streamed event: " + streamedEvent);
基本上,流工作正常。发生插入/更新操作时,会识别事件并正确流式传输文档。但是,当两个节点之一(主节点或辅助节点,一个承载数据的节点)出现故障时,观察者将停止流式传输任何内容。更新/插入操作在数据库上继续正常,但流被阻止。一旦我重新启动两个节点之一,流立即正确恢复,并向我显示以前未流式传输的事件。另一方面,如果仲裁节点出现故障,则流继续正常工作。
在我的 rs.conf() 文件下方,如您所见,WriteConcern 参数为 1。
{
"_id" : "replica",
"version" : 31,
"protocolVersion" : NumberLong(1),
"writeConcernMajorityJournalDefault" : true,
"members" : [
{
"_id" : 0,
"host" : "host1:27017",
"arbiterOnly" : false,
"buildIndexes" : true,
"hidden" : false,
"priority" : 1000,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
},
{
"_id" : 1,
"host" : "host2:27017",
"arbiterOnly" : false,
"buildIndexes" : true,
"hidden" : false,
"priority" : 1,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
},
{
"_id" : 4,
"host" : "host2:27018",
"arbiterOnly" : true,
"buildIndexes" : true,
"hidden" : false,
"priority" : 0,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
}
],
"settings" : {
"chainingAllowed" : true,
"heartbeatIntervalMillis" : 500,
"heartbeatTimeoutSecs" : 3,
"electionTimeoutMillis" : 3000,
"catchUpTimeoutMillis" : -1,
"catchUpTakeoverDelayMillis" : 30000,
"getLastErrorModes" : {
},
"getLastErrorDefaults" : {
"w" : 1,
"j" : false,
"wtimeout" : 0
},
"replicaSetId" : ObjectId("5f16a4e1e1c622bbea578576")
}}
谁能帮我解决这个问题?
更新:为了解决此问题,在每个节点的每个 .conf 文件中,我将 replication.enableMajorityReadConcern 设置为 false,以禁用 ReadConcernMajority。无论如何,按照此设置,通过停止主节点或辅助节点之一,我总是在控制台中收到以下异常:
Exception in thread "main" com.mongodb.MongoExecutionTimeoutException: Error waiting for snapshot not less than { ts: Timestamp(1605805914, 1), t: -1 }, current relevant optime is { ts: Timestamp(1605805864, 1), t: 71 }. :: caused by :: operation exceeded time limit
at com.mongodb.internal.connection.ProtocolHelper.createSpecialException(ProtocolHelper.java:239)
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:171)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:359)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:280)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:259)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:110)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:345)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:336)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:222)
at com.mongodb.internal.operation.CommandOperationHelper$5.call(CommandOperationHelper.java:208)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:205)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:189)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:325)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:321)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:321)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:60)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:178)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:158)
at com.mongodb.client.internal.ChangeStreamIterableImpl.iterator(ChangeStreamIterableImpl.java:153)
at com.softstrategy.ProvaWatcher.ProvaWatcherApplication.main(ProvaWatcherApplication.java:34)
另一方面,如果我默认注释掉每个文件 .conf 节点中的 enableMajorityReadConcern,则不会出现该异常。
因此,我的问题是以下两个:
- 为什么仅当 ReadConcern 设置为 false 并且当节点关闭是数据承载节点时才会引发该异常?
- 为什么当仲裁节点关闭时,无论 ReadConcern 设置如何,都不会引发该异常?
谢谢!