根据mongodb 文档:
在无效事件(例如,集合删除或重命名)关闭流后,您不能使用 resumeAfter 来恢复更改流。从 MongoDB 4.2 开始,您可以使用 startAfter 在无效事件之后启动新的更改流。
我使用此代码在集合重命名后恢复更改流:
var collection = db.GetCollection<Student>("bla");
var student1 = new Student();
var are = new AutoResetEvent(false);
//RESUME TOKEN
BsonDocument resumeToken = null;
var streamListener = Task.Run(() =>
{
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
using var cursor = collection.Watch(pipeline, options);
are.Set();
using var enumerator = cursor.ToEnumerable().GetEnumerator();
//IT'S OK HERE
enumerator.MoveNext();
var event1 = enumerator.Current;
event1.FullDocument.Id.Should().Be(student1.Id);
event1.OperationType.Should().Be(ChangeStreamOperationType.Insert);
//SAVE RESUME TOKEN
resumeToken = event1.ResumeToken;
});
are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;
//RENAME COLLECTION
await db.RenameCollectionAsync(collection.CollectionNamespace.CollectionName, collection.CollectionNamespace.CollectionName + "tmp");
collection = db.GetCollection<Student>("bla");
streamListener = Task.Run(() =>
{
//RESTORE CHANGE STREAM
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, StartAfter = resumeToken };
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
using var cursor = collection.Watch(pipeline, options);
are.Set();
enumerator.MoveNext();// IT RESTURNS FALSE IMMEDIATELY
});
are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;
但是当我尝试恢复更改流(我将恢复令牌传递到StartAfter
)时,它会立即退出。我的代码有什么问题?
db.version()
-4.4.0
MongoDB.Driver
-2.11.1