0

我在我的应用程序中有一个地方想使用 Mongo (3.6) 作为多线程的锁(在不同的服务器上)。本质上类似于“如果一个线程开始工作,其他线程应该通过 mongo 看到它并且不要并行启动相同的工作”。

从我学到的文档中

结合“majority”写关注点,“线性化”读关注点可以让多个线程对单个文档进行读写,就好像单个线程实时执行这些操作一样;

所以这对我来说听起来不错,如果一个线程开始工作,我会插入某个文档,而其他线程会检查该文档是否已经存在,如果存在则不启动,但它不适用于我的情况。

我准备了两个测试——一个成功阻塞第二个线程的非并行测试——但是并行测试失败了,我得到了其中两个RebuildLog文档。

using System;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;

namespace FindOneAndUpdateTests
{
    public class FindOneAndUpdateTests
    {
        private static IMongoDatabase GetDatabase()
        {
            var dbName = "test";
            var client = new MongoClient("mongodb://localhost:45022");

            return client.GetDatabase(dbName);
        }

        private IMongoCollection<RebuildLog> GetCollection()
        {
            return GetDatabase().GetCollection<RebuildLog>("RebuildLog");
        }

        [Fact]
        public async Task FindOneAndUpdate_NotParallel_Test()
        {
            var dlpId = Guid.NewGuid();

            var first = await FindOneAndUpdateMethod(dlpId);
            var second = await FindOneAndUpdateMethod(dlpId);

            first.Should().BeFalse();
            second.Should().BeTrue();
        }

        [Fact]
        public async Task FindOneAndUpdate_Parallel_Test()
        {
            var dlpId = Guid.NewGuid();

            var taskFirst = FindOneAndUpdateMethod(dlpId);
            var taskSecond = FindOneAndUpdateMethod(dlpId);

            var first = await taskFirst;
            var second = await taskSecond;

            first.Should().BeFalse();
            second.Should().BeTrue();
        }

        private async Task<bool> FindOneAndUpdateMethod(Guid dlpId)
        {
            var mongoCollection = GetCollection();

            var filterBuilder = Builders<RebuildLog>.Filter;
            var filter = filterBuilder.Where(w => w.DlpId == dlpId);

            var creator = Builders<RebuildLog>.Update
                .SetOnInsert(w => w.DlpId, dlpId)
                .SetOnInsert(w => w.ChangeDate, DateTime.UtcNow)
                .SetOnInsert(w => w.BuildDate, DateTime.UtcNow)
                .SetOnInsert(w => w.Id, Guid.NewGuid());

            var options = new FindOneAndUpdateOptions<RebuildLog>
            {
                IsUpsert = true,
                ReturnDocument = ReturnDocument.Before
            };

            var result = await mongoCollection
                .WithWriteConcern(WriteConcern.WMajority)
                .WithReadConcern(ReadConcern.Linearizable)
                .FindOneAndUpdateAsync(filter, creator, options);

            return result != null;
        }
    }

    [BsonIgnoreExtraElements]
    public class RebuildLog
    {
        public RebuildLog()
        {
            Id = Guid.NewGuid();
        }

        public Guid Id { get; set; }
        public DateTime ChangeDate { get; set; }
        public string ChangeUser { get; set; }
        public Guid DlpId { get; set; }
        public string Portal { get; set; }
        public DateTime? BuildDate { get; set; }
    }
}

我怀疑我对原子手工制作的 GetOrInsert 的想法(参见带有 IsUpsert 的 FindOneAndUpdate)打破了文档中“在单个文档上”的约束。有什么想法可以解决这个问题还是不可能?

4

1 回答 1

0

挺有趣的。可能是您在 DlpId 上没有唯一索引吗?这就是为什么 mongo 决定不需要顺序执行这些操作的原因,因为在您的情况下,它不是先写后读模式(正如它在“客户端会话和因果一致性保证”中指出的那样)。它同时更新或创建两次。那这个呢?:

public class SyncDocument 
{
    // ...
    [BsonElement("locked"), BsonDefaultValue(false)]
    public bool Locked { get; set; }
}

在客户端代码中:

var filter = Builders<SyncDocument>.Filter.Eq(d => d.Locked, false);
var update = Builders<SyncDocument>.Update.Set(d => d.Locked, true);
var result = collection.UpdateOne(filter, update);
if (result.ModifiedCount == 1) {
    Console.WriteLine("Lock acquired");
}

应在应用程序启动之前创建具有锁定字段的文档(如果它适用于您的任务)。

于 2019-05-15T12:39:07.873 回答