1

根据mongodb 文档

在无效事件(例如,集合删除或重命名)关闭流后,您不能使用 resumeAfter 来恢复更改流。从 MongoDB 4.2 开始,您可以使用 startAfter 在无效事件之后启动新的更改流。

我使用此代码在集合重命名后恢复更改流:

var collection = db.GetCollection<Student>("bla");
var student1 = new Student();
var are = new AutoResetEvent(false);

//RESUME TOKEN
BsonDocument resumeToken = null;

var streamListener = Task.Run(() =>
{
    var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
    var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");

    using var cursor = collection.Watch(pipeline, options);
    are.Set();
    using var enumerator = cursor.ToEnumerable().GetEnumerator();
    
    //IT'S OK HERE
    enumerator.MoveNext();
    var event1 = enumerator.Current;
    event1.FullDocument.Id.Should().Be(student1.Id);
    event1.OperationType.Should().Be(ChangeStreamOperationType.Insert);
    
    //SAVE RESUME TOKEN
    resumeToken = event1.ResumeToken;
});

are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;

//RENAME COLLECTION
await db.RenameCollectionAsync(collection.CollectionNamespace.CollectionName, collection.CollectionNamespace.CollectionName + "tmp");

collection = db.GetCollection<Student>("bla");
streamListener = Task.Run(() =>
{
    //RESTORE CHANGE STREAM
    var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, StartAfter = resumeToken };
    var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Student>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");

    using var cursor = collection.Watch(pipeline, options);
    are.Set();
    enumerator.MoveNext();// IT RESTURNS FALSE IMMEDIATELY
});

are.WaitOne();
await collection.InsertOneAsync(student1);
await streamListener;

但是当我尝试恢复更改流(我将恢复令牌传递到StartAfter)时,它会立即退出。我的代码有什么问题?

  • db.version()-4.4.0
  • MongoDB.Driver-2.11.1
4

1 回答 1

1

在这里我找到了答案:

与 resumeAfter 不同,startAfter 可以在无效事件后通过创建新的更改流来恢复通知。

我也添加到此列表{ operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } } invalidate事件中,现在它按预期工作。

于 2021-01-22T19:45:01.563 回答