我正在使用带有融合平台 v5.4.1 和 MongoDB v3.6 ReplicaSet 的 Kafka MongoDB 源连接器 [https://www.confluent.io/hub/mongodb/kafka-connect-mongodb]。Kafka MongoDB 源连接器已被删除,现在当它在一个月后再次重新创建时,当我收到以下错误时。
com.mongodb.MongoQueryException: Query failed with error code 280 and error message 'resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F06E90400000004463C5F6964003C38316266623663632D326638612D343530662D396534652D31393936336362376130386500005A1004A486EE3E58984454ADD5BF58F364361E04")}' on server 40.118.122.226:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:29)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:267)
at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216)
at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200)
at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:86)
at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:83)
at com.mongodb.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:166)
at com.mongodb.operation.ChangeStreamBatchCursor.tryNext(ChangeStreamBatchCursor.java:83)
at com.mongodb.client.internal.MongoChangeStreamCursorImpl.tryNext(MongoChangeStreamCursorImpl.java:78)
at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:338)
at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-07-09 09:53:09,353] INFO Watching for collection changes on '<myDBName.myCollectionName>' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
在搜索了这个错误的原因后,我明白在 Oplog 中找不到恢复令牌,因为 Oplog 是内存/大小上限,它会清除旧信息。我也明白,为了尽量减少这个问题的发生,我应该增加 Oplog 大小等。但我想知道是否有可能从 Kafka/Confluent 平台方面解决这个问题?就像我可以删除 Kafka 主题、KSQL 主题一样,因为我正在使用主题“myDBName.myCollectionNamedata”、与主题关联的数据创建流,或者在 Kafka Connect 中执行一些操作,以便 MongoDB 源连接器开始捕获更改在 MongoDB Collections 中再次从当前时间丢弃旧信息?