1

我正在编写一个简单的消息传递模块,因此一个进程可以发布消息,另一个进程可以订阅它们。我使用 EF/SqlServer 作为进程外通信机制。“服务器”只是发布者/订阅者对的共同名称(可以称为“通道”)。

我有以下方法向数据库中添加一行,表示一个名为“服务器”

    public void AddServer(string name)
    {
        if (!context.Servers.Any(c => c.Name == name))
        {
            context.Servers.Add(new Server { Name = name });
        }
    }

我遇到的问题是,当我同时启动两个客户端时,只有一个应该添加一个新的服务器条目,但是,这不是它的工作方式。实际上,我得到了两个具有相同名称的条目的非常错误的结果,并且意识到 Any() 防护不足以解决此问题。

服务器的实体使用一个 int PK,据说我的存储库将强制执行 Name 字段的唯一性。我开始认为这行不通。

public class Server
{
    public int Id { get; set; }
    public string Name { get; set; }
}

我认为可以解决此问题的两种方法似乎都不理想:

  1. 字符串主键
  2. 忽略异常

这是并发的问题,对吧?

在我希望两个客户端调用具有相同名称的存储库但在数据库中仅获得具有该名称的一行的结果的情况下,我该如何处理?

在此处输入图像描述

更新:这是存储库代码

namespace MyBus.Data
{
    public class Repository : IDisposable
    {
        private readonly Context context;
        private readonly bool autoSave;

        public delegate Chain Chain(Action<Repository> action);
        public static Chain Command(Action<Repository> action)
        {
            using (var repo = new Data.Repository(true))
            {
                action(repo);
            }
            return new Chain(next => Command(next));
        }

        public Repository(bool autoSave)
        {
            this.autoSave = autoSave;
            context = new Context();
        }

        public void Dispose()
        {
            if (autoSave)
                context.SaveChanges();
            context.Dispose();
        }

        public void AddServer(string name)
        {
            if (!context.Servers.Any(c => c.Name == name))
            {
                context.Servers.Add(new Server { Name = name });
            }
        }

        public void AddClient(string name, bool isPublisher)
        {
            if (!context.Clients.Any(c => c.Name == name))
            {
                context.Clients.Add(new Client
                {
                    Name = name,
                    ClientType = isPublisher ? ClientType.Publisher : ClientType.Subscriber
                });
            }
        }

        public void AddMessageType<T>()
        {
            var typeName = typeof(T).FullName;
            if (!context.MessageTypes.Any(c => c.Name == typeName))
            {
                context.MessageTypes.Add(new MessageType { Name = typeName });
            }
        }

        public void AddRegistration<T>(string serverName, string clientName)
        {
            var server = context.Servers.Single(c => c.Name == serverName);
            var client = context.Clients.Single(c => c.Name == clientName);
            var messageType = context.MessageTypes.Single(c => c.Name == typeof(T).FullName);
            if (!context.Registrations.Any(c =>
                    c.ServerId == server.Id &&
                    c.ClientId == client.Id &&
                    c.MessageTypeId == messageType.Id))
            {
                context.Registrations.Add(new Registration
                {
                    Client = client,
                    Server = server,
                    MessageType = messageType
                });
            }
        }

        public void AddMessage<T>(T item, out int messageId)
        {
            var messageType = context.MessageTypes.Single(c => c.Name == typeof(T).FullName);
            var serializer = new XmlSerializer(typeof(T));
            var sb = new StringBuilder();
            using (var sw = new StringWriter(sb))
            {
                serializer.Serialize(sw, item);
            }
            var message = new Message
            {
                MessageType = messageType,
                Created = DateTime.UtcNow,
                Data = sb.ToString()
            };
            context.Messages.Add(message);
            context.SaveChanges();
            messageId = message.Id;
        }

        public void CreateDeliveries<T>(int messageId, string serverName, string sendingClientName, T item)
        {
            var messageType = typeof(T).FullName;

            var query = from reg in context.Registrations
                        where reg.Server.Name == serverName &&
                              reg.Client.ClientType == ClientType.Subscriber &&
                              reg.MessageType.Name == messageType
                        select new
                        {
                            reg.ClientId
                        };

            var senderClientId = context.Clients.Single(c => c.Name == sendingClientName).Id;

            foreach (var reg in query)
            {
                context.Deliveries.Add(new Delivery
                {
                    SenderClientId = senderClientId,
                    ReceiverClientId = reg.ClientId,
                    MessageId = messageId,
                    Updated = DateTime.UtcNow,
                    DeliveryStatus = DeliveryStatus.Sent
                });
            }
        }

        public List<T> GetDeliveries<T>(string serverName, string clientName, out List<int> messageIds)
        {
            messageIds = new List<int>();
            var messages = new List<T>();
            var clientId = context.Clients.Single(c => c.Name == clientName).Id;
            var query = from del in context.Deliveries
                        where del.ReceiverClientId == clientId &&
                              del.DeliveryStatus == DeliveryStatus.Sent
                        select new
                        {
                            del.Id,
                            del.Message.Data
                        };
            foreach (var item in query)
            {
                var serializer = new XmlSerializer(typeof(T));
                using (var sr = new StringReader(item.Data))
                {
                    var t = (T)serializer.Deserialize(sr);
                    messages.Add(t);
                    messageIds.Add(item.Id);
                }
            }
            return messages;
        }

        public void ConfirmDelivery(int deliveryId)
        {
            using (var context = new Context())
            {
                context.Deliveries.First(c => c.Id == deliveryId).DeliveryStatus = DeliveryStatus.Received;
                context.SaveChanges();
            }
        }
    }
}
4

3 回答 3

1

您可以保留 int 主键,但也可以在列上定义唯一索引。Name

这样,在并发情况下,只有第一次插入会成功;任何尝试插入相同服务器名称的后续客户端都将失败并显示SqlException.

于 2013-07-08T20:24:29.053 回答
1

我目前正在使用这个解决方案:

    public void AddServer(string name)
    {
        if (!context.Servers.Any(c => c.Name == name))
        {
            context.Database.ExecuteSqlCommand(@"MERGE Servers WITH (HOLDLOCK) AS T
                                                 USING (SELECT {0} AS Name) AS S
                                                 ON T.Name = S.Name
                                                 WHEN NOT MATCHED THEN 
                                                 INSERT (Name) VALUES ({0});", name);
        }
    }
于 2013-07-08T23:38:56.947 回答
1

作为彻底的练习,我(我想我)以另一种方式解决了这个问题,它保留了 EF 上下文的类型安全,但增加了一些复杂性:

首先,在这篇文章中,我学习了如何向 Server 表添加唯一约束:

这是上下文代码:

    public class Context : DbContext
    {
        public DbSet<MessageType> MessageTypes { get; set; }
        public DbSet<Message> Messages { get; set; }
        public DbSet<Delivery> Deliveries { get; set; }
        public DbSet<Client> Clients { get; set; }
        public DbSet<Server> Servers { get; set; }
        public DbSet<Registration> Registrations { get; set; }

        public class Initializer : IDatabaseInitializer<Context>
        {
            public void InitializeDatabase(Context context)
            {
                if (context.Database.Exists() && !context.Database.CompatibleWithModel(false))
                    context.Database.Delete();

                if (!context.Database.Exists())
                {
                    context.Database.Create();
                    context.Database.ExecuteSqlCommand(
                       @"alter table Servers 
                         add constraint UniqueServerName unique (Name)");
                }
            }
        }
    }

现在我需要一种在保存时选择性地忽略异常的方法。我通过将以下成员添加到我的存储库来做到这一点:

readonly List<Func<Exception, bool>> ExceptionsIgnoredOnSave = 
    new List<Func<Exception, bool>>();

static readonly Func<Exception, bool> UniqueConstraintViolation =
    e => e.AnyMessageContains("Violation of UNIQUE KEY constraint");

以及一个新的扩展方法,可以根据内部异常链中文本的位置进行循环保持:

public static class Ext
{
    public static bool AnyMessageContains(this Exception ex, string text)
    {
        while (ex != null)
        {
            if(ex.Message.Contains(text))
                return true;
            ex = ex.InnerException;
        }
        return false;
    }
}

我修改了我的 Repository 的 Dispose 方法来检查是否应该忽略或重新抛出异常:

    public void Dispose()
    {
        if (autoSave)
        {
            try
            {
                context.SaveChanges();
            }
            catch (Exception ex)
            {      
                if(!ExceptionsIgnoredOnSave.Any(pass => pass(ex)))
                    throw;
                Console.WriteLine("ignoring exception..."); // temp
            }
        }
        context.Dispose();
    }

最后,在调用 的方法中Add,我将可接受的条件添加到列表中:

    public void AddServer(string name)
    {
        ExceptionsIgnoredOnSave.Add(UniqueConstraintViolation);

        if (!context.Servers.Any(c => c.Name == name))
        {
            var server = context.Servers.Add(new Server { Name = name });
        }
    }
于 2013-07-09T08:10:06.077 回答