14

当您使用 Session-Per-Request 模式时,您在使用 NHibernate 需要支持重试事务失败的 3 层应用程序中使用什么模式/架构?(因为 ISession 在异常后变得无效,即使这是死锁或超时或活锁异常)。

4

1 回答 1

37

注意 2现在我永远不会将写事务放在 web 项目中 - 而是使用消息传递 + 队列,并在后台有一个工作人员处理旨在完成事务工作的消息。

但是,我仍然会使用事务进行读取以获取一致的数据;与 MVCC/Snapshot 隔离一起,来自 Web 项目。在这种情况下,您会发现每请求每事务的会话非常好。

注 1这篇文章的想法已经放在Castle Transactions 框架和我的新NHibernate Facility中。

好的,这是一般的想法。假设您要为客户创建一个未完成的订单。您有某种 GUI,例如浏览器/MVC 应用程序,它使用相关信息创建一个新的数据结构(或者您从网络获取此数据结构):

[Serializable]
class CreateOrder /*: IMessage*/
{
    // immutable
    private readonly string _CustomerName;
    private readonly decimal _Total;
    private readonly Guid _CustomerId;

    public CreateOrder(string customerName, decimal total, Guid customerId)
    {
        _CustomerName = customerName;
        _Total = total;
        _CustomerId = customerId;
    }

    // put ProtoBuf attribute
    public string CustomerName
    {
        get { return _CustomerName; }
    }

    // put ProtoBuf attribute
    public decimal Total
    {
        get { return _Total; }
    }

    // put ProtoBuf attribute
    public Guid CustomerId
    {
        get { return _CustomerId; }
    }
}

你需要一些东西来处理它。这可能是某种服务总线中的命令处理程序。“命令处理程序”一词是其中之一,您不妨将其称为“服务”或“域服务”或“消息处理程序”。如果你在做函数式编程,那将是你的消息框实现,或者如果你在做 Erlang 或 Akka,那将是一个 Actor。

class CreateOrderHandler : IHandle<CreateOrder>
{
    public void Handle(CreateOrder command)
    {
        With.Policy(IoC.Resolve<ISession>, s => s.BeginTransaction(), s =>
        {
            var potentialCustomer = s.Get<PotentialCustomer>(command.CustomerId);
            potentialCustomer.CreateOrder(command.Total);
            return potentialCustomer;
        }, RetryPolicies.ExponentialBackOff.RetryOnLivelockAndDeadlock(3));
    }
}

interface IHandle<T> /* where T : IMessage */
{
    void Handle(T command);
}

上面显示了您可能为这个给定的问题域(应用程序状态/事务处理)选择的 API 用法。

With的实现:

static class With
{
    internal static void Policy(Func<ISession> getSession,
                                       Func<ISession, ITransaction> getTransaction,
                                       Func<ISession, EntityBase /* abstract 'entity' base class */> executeAction,
                                       IRetryPolicy policy)
    {
        //http://fabiomaulo.blogspot.com/2009/06/improving-ado-exception-management-in.html

        while (true)
        {
            using (var session = getSession())
            using (var t = getTransaction(session))
            {
                var entity = executeAction(session);
                try
                {
                    // we might not always want to update; have another level of indirection if you wish
                    session.Update(entity);
                    t.Commit();
                    break; // we're done, stop looping
                }
                catch (ADOException e)
                {
                    // need to clear 2nd level cache, or we'll get 'entity associated with another ISession'-exception

                    // but the session is now broken in all other regards will will throw exceptions
                    // if you prod it in any other way
                    session.Evict(entity);

                    if (!t.WasRolledBack) t.Rollback(); // will back our transaction

                    // this would need to be through another level of indirection if you support more databases
                    var dbException = ADOExceptionHelper.ExtractDbException(e) as SqlException;

                    if (policy.PerformRetry(dbException)) continue;
                    throw; // otherwise, we stop by throwing the exception back up the layers
                }
            }
        }
    }
}

如您所见,我们需要一个新的工作单元;每次出现问题时的 ISession。这就是为什么循环位于 Using 语句/块之外的原因。拥有函数相当于拥有工厂实例,只是我们直接在对象实例上调用,而不是在其上调用方法。它提供了一个更好的调用者 API 恕我直言。

我们希望相当顺利地处理我们如何执行重试,因此我们有一个可以由不同的处理程序实现的接口,称为 IRetryHandler。应该可以将这些链接到您想要强制执行控制流的每个方面(是的,它非常接近 AOP)。与 AOP 的工作方式类似,返回值用于控制控制流,但仅以真/假方式,这是我们的要求。

interface IRetryPolicy
{
    bool PerformRetry(SqlException ex);
}

AggregateRoot、PotentialCustomer 是一个具有生命周期的实体。这就是您将使用 *.hbm.xml 文件/FluentNHibernate 映射的内容。

它有一个与发送的命令 1:1 对应的方法。这使得命令处理程序完全易于阅读。

此外,使用带有鸭子类型的动态语言,它允许您将命令的类型名称映射到方法,类似于 Ruby/Smalltalk 的做法。

如果您在进行事件溯源,则事务处理将是相似的,除了事务不会与 NHibernate 的接口交互。推论是您将保存通过调用 CreateOrder(decimal) 创建的事件,并为您的实体提供从存储中重新读取已保存事件的机制。

最后要注意的一点是,我覆盖了我创建的三个方法。这是 NHibernate 方面的要求,因为它需要一种知道一个实体何时与另一个实体相等的方法,它们是否应该在集合/包中。更多关于我在这里的实现。无论如何,这是示例代码,我现在不关心我的客户,所以我没有实现它们:

sealed class PotentialCustomer : EntityBase
{
    public void CreateOrder(decimal total)
    {
        // validate total
        // run business rules

        // create event, save into event sourced queue as transient event
        // update private state
    }

    public override bool IsTransient() { throw new NotImplementedException(); }
    protected override int GetTransientHashCode() { throw new NotImplementedException(); }
    protected override int GetNonTransientHashCode() { throw new NotImplementedException(); }
}

我们需要一种创建重试策略的方法。当然,我们可以通过多种方式做到这一点。在这里,我将一个流畅的接口与一个与静态方法的类型相同的对象的实例相结合。我显式地实现了接口,因此在流畅的接口中没有其他方法可见。此接口仅使用下面我的“示例”实现。

internal class RetryPolicies : INonConfiguredPolicy
{
    private readonly IRetryPolicy _Policy;

    private RetryPolicies(IRetryPolicy policy)
    {
        if (policy == null) throw new ArgumentNullException("policy");
        _Policy = policy;
    }

    public static readonly INonConfiguredPolicy ExponentialBackOff =
        new RetryPolicies(new ExponentialBackOffPolicy(TimeSpan.FromMilliseconds(200)));

    IRetryPolicy INonConfiguredPolicy.RetryOnLivelockAndDeadlock(int retries)
    {
        return new ChainingPolicy(new[] {new SqlServerRetryPolicy(retries), _Policy});
    }
}

我们需要一个接口来部分完整地调用 fluent 接口。这给了我们类型安全性。因此,在完成策略配置之前,我们需要远离我们的静态类型的两个取消引用运算符(即“句号”-- (.))。

internal interface INonConfiguredPolicy
{
    IRetryPolicy RetryOnLivelockAndDeadlock(int retries);
}

可以解决链接策略。它的实现检查所有子节点是否继续返回,并且在检查时,它还执行它们中的逻辑。

internal class ChainingPolicy : IRetryPolicy
{
    private readonly IEnumerable<IRetryPolicy> _Policies;

    public ChainingPolicy(IEnumerable<IRetryPolicy> policies)
    {
        if (policies == null) throw new ArgumentNullException("policies");
        _Policies = policies;
    }

    public bool PerformRetry(SqlException ex)
    {
        return _Policies.Aggregate(true, (val, policy) => val && policy.PerformRetry(ex));
    }
}

这个策略让当前线程休眠一段时间;有时数据库过载,并且让多个读取器/写入器不断尝试读取将是对数据库的事实上的 DOS 攻击(请参阅几个月前 Facebook 崩溃时发生的事情,因为他们的缓存服务器都同时查询他们的数据库时间)。

internal class ExponentialBackOffPolicy : IRetryPolicy
{
    private readonly TimeSpan _MaxWait;
    private TimeSpan _CurrentWait = TimeSpan.Zero; // initially, don't wait

    public ExponentialBackOffPolicy(TimeSpan maxWait)
    {
        _MaxWait = maxWait;
    }

    public bool PerformRetry(SqlException ex)
    {
        Thread.Sleep(_CurrentWait);
        _CurrentWait = _CurrentWait == TimeSpan.Zero ? TimeSpan.FromMilliseconds(20) : _CurrentWait + _CurrentWait;
        return _CurrentWait <= _MaxWait;
    }
}

同样,在任何好的基于 SQL 的系统中,我们都需要处理死锁。我们无法真正深入地计划这些,尤其是在使用 NHibernate 时,除了保持严格的事务策略——没有隐式事务;并小心Open-Session-In-View。如果您要获取大量数据,还需要记住笛卡尔积问题/N+1 选择问题。取而代之的是,您可能有 Multi-Query 或 HQL 的 'fetch' 关键字。

internal class SqlServerRetryPolicy : IRetryPolicy
{
    private int _Tries;
    private readonly int _CutOffPoint;

    public SqlServerRetryPolicy(int cutOffPoint)
    {
        if (cutOffPoint < 1) throw new ArgumentOutOfRangeException("cutOffPoint");
        _CutOffPoint = cutOffPoint;
    }

    public bool PerformRetry(SqlException ex)
    {
        if (ex == null) throw new ArgumentNullException("ex");
        // checks the ErrorCode property on the SqlException
        return SqlServerExceptions.IsThisADeadlock(ex) && ++_Tries < _CutOffPoint;
    }
}

一个帮助类,使代码更好读。

internal static class SqlServerExceptions
{
    public static bool IsThisADeadlock(SqlException realException)
    {
        return realException.ErrorCode == 1205;
    }
}

不要忘记在 IConnectionFactory 中处理网络故障(可能通过实现 IConnection 进行委派)。


PS:如果您不只是在阅读,那么每个请求的会话是一种破坏模式。特别是如果您正在使用与您正在编写的相同 ISession 进行阅读,并且您没有对读取进行排序,因此它们总是在写入之前。

于 2011-02-10T12:30:55.220 回答