0

尽管处理程序中没有引发异常,但 NServiceBus 重发消息 X 次时遇到了一个奇怪的问题。有一些关于 NHibernate 会话和 NSB 环境事务的信息。由于没有引发错误,我不能 100% 确定问题所在,因此无法真正决定要做什么。

我将 NSB 配置为 Castle Windsor,如下所示:

IWindsorContainer container = new WindsorContainer(new XmlInterpreter());
container.Install(new ContainerInstaller());
container.Install(new UnitOfWorkInstaller(AppDomain.CurrentDomain.BaseDirectory, Castle.Core.LifestyleType.Scoped));
container.Install(new FactoryInstaller(AppDomain.CurrentDomain.BaseDirectory));
container.Install(new RepositoryInstaller(AppDomain.CurrentDomain.BaseDirectory));

Configure.With()
    .CastleWindsorBuilder(container)
    .FileShareDataBus(Properties.Settings.Default.DataBusFileSharePath)
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .UnicastBus()
         .LoadMessageHandlers()
         .ImpersonateSender(false)
     .JsonSerializer();

像这样UnitOfWorkInstaller注册工作单元(NHibernate 会话):

public void Install(IWindsorContainer container, IConfigurationStore store)
{
    var fromAssemblyDescriptor = AllTypes.FromAssemblyInDirectory(new AssemblyFilter(_installationPath));
    container.Register(fromAssemblyDescriptor
        .IncludeNonPublicTypes()
        .Pick()
        .If(t => t.GetInterfaces().Any(i => i == typeof(IUnitOfWork)) && t.Namespace.StartsWith("Magma"))
        .WithService.AllInterfaces()
        .Configure(con => con.LifeStyle.Is(_lifeStyleType).UsingFactoryMethod(k => k.Resolve<IUnitOfWorkFactory>().Create())));
}

因此,每次消息到达时,所有存储库都使用相同的工作单元。我读到手动回滚当前事务会导致错误(我真的不知道为什么),而且我还知道 NSB 为每个传输消息创建一个子容器,并且这个子容器在消息处理后被释放。问题是当子容器被处理时,工作单元是这样处理的:

    public void Dispose()
    {
        if (!_isDisposed)
        {
            DiscardSession();
            _isDisposed = true;
        }
    }

    private void DiscardSession()
    {
        if (_transaction != null && _transaction.IsActive)
        {
            _transaction.Dispose();
        }
        if (Session != null)
        {
            Session.Dispose();
        }
    }

我的处理程序的结构如下:(_unitOfWork 作为构造函数依赖项传递)

    public void Handle(<MessageType> message)
    {
        using (_unitOfWork)
        {
            try
            {
                // do stuff
                _unitOfWork.Commit();
            }
            catch (Exception ex)
            {
                _unitOfWork.Rollback();

                // rethrow so the message stays in the queue
                throw;
            }
        }
    }

我发现如果我不提交工作单元(刷新会话并提交事务),我会收到一条错误消息,提示消息已重试超过最大重试计数 bla bla bla ...

所以它似乎与 NHibernate 会话以及它的创建和处理方式有关,但由于它是在工作单元中创建的,所以我不能真正使用会话工厂。我读到我可以使用 IMessageModule 来创建和处理会话,但是我不知道这是否是正确的方法,因为我首先不明白是什么导致了错误。

回顾一下:

  • 我正在使用一个范围内的工作单元,以便所有使用它的处理程序依赖项将共享相同的实例(感谢子容器,顺便说一句:我已经将工作单元设置为瞬态,认为子容器将处理所有瞬态对象作为该容器中的单例,但我看到工作单元没有共享,所以这就是为什么它设置为范围)

  • 我将我的处理程序包装在一个using(_unitOfWork) { }语句中,以在每次处理后处理工作单元。

  • 当工作单元被释放时,NHibernate 会话也被释放

  • 如果我没有显式调用Commit_unitOfWork则消息重试超出最大重试次数,然后引发错误。

是什么导致了这种行为?IMessageModule 是这个问题的答案吗?

4

1 回答 1

0

我想我把它缩小了一点......我删除了所有的using(_unitOfWork)_unitOfWork.Commit()和 ,_unitOfWork.Rollback()并让 NSBTransactionScope完成提交或回滚事务的工作,因为 NHibernate 的 Session 正在 NSB 事务范围内登记。

我还开始使用 NHibernate 会话的事务 ( Session.Transaction),而不是通过Session.BeginTransaction()并使用它来获取对它的引用。我已经复制/粘贴了我的 UoW 实现,以便您可以看到差异(旧代码在注释中)。

我不知道我的更改是否解释了除了使用会话的事务并删除刷新,因为它在事务提交中处理似乎已经解决了问题......我不必Commit按顺序显式调用该方法以便消息被成功处理。这是我的 UoW 实现:

public class NHibernateUnitOfWork : INHibernateUnitOfWork
{
    //private ITransaction _transaction;
    private bool _isDisposed;
    private bool _isInError;

    public ISession Session { get; protected set; }

    public NHibernateUnitOfWork(ISession session)
    {
        Contract.Requires(session != null, "session");
        Session = session;

        //_transaction = Session.BeginTransaction();

        // create a new transaction as soon as the session is available
        Session.BeginTransaction();
        _isDisposed = false;
        _isInError = false;
    }

    public void MarkCreated(Object entity)
    {
        // assert stuff

        try
        {
            Session.SaveOrUpdate(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void MarkUpdated(Object entity)
    {
        // assert stuff

        try
        {
            Session.Update(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void MarkSavedOrUpdated(Object entity)
    {
        // assert stuff

        try
        {
            Session.SaveOrUpdate(entity);
            //Session.Flush();
        }
        catch (HibernateException)
        {
            HandleError();
            throw;
        }
    }

    public void MarkDeleted(Object entity)
    {
        // assert stuff

        try
        {
            Session.Delete(entity);
            //Session.Flush();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Commit()
    {
        // assert stuff

        try
        {
            //Session.Flush();
            //_transaction.Commit();
            Session.Transaction.Commit();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Rollback()
    {
        // assert stuff

        try
        {
            //if (!_transaction.WasRolledBack)
            //{
            //    _transaction.Rollback();
            //}
            Session.Transaction.Rollback();
        }
        catch (HibernateException ex)
        {
            HandleError();
            throw;
        }
    }

    public void Dispose()
    {
        if (!_isDisposed)
        {
            DiscardSession();
            _isDisposed = true;
        }
    }

    private void DiscardSession()
    {
        //if (_transaction != null && _transaction.IsActive)
        //{
        //    _transaction.Dispose();
        //}
        if (Session != null)
        {
            try
            {
                // rollback all uncommitted changes
                if (Session.Transaction != null && Session.Transaction.IsActive)
                {
                    Session.Transaction.Rollback();
                }
                //Session.Clear();
                Session.Close();
            }
            catch (Exception)
            { }
            finally
            {
                Session.Dispose();
            }
        }
    }

    private void HandleError()
    {
        _isInError = true;
        //if (_transaction != null && _transaction.IsActive)
        //{
        //    _transaction.Rollback();
        //}
        if (Session.Transaction != null && Session.Transaction.IsActive)
        {
            Session.Transaction.Rollback();
        }
    }

    // assert methods
}

这有什么意义吗?我仍然不知道是什么首先导致了错误,但这似乎与在事务范围完成之前处理 NHibernate Session 有关。

于 2012-04-12T15:36:32.320 回答