2012-04-11 70 views
0

虽然在处理程序中没有引发异常,但我得到了NServiceBus重试消息X次的奇怪问题。有一些信息处理NHibernate会话和NSB环境事务。由于没有错误发生,我不是100%确定的问题,因此不能真正决定做什么。即使没有异常,NServiceBus重试消息

我被配置NSB与城堡温莎像这样:

IWindsorContainer container = new WindsorContainer(new XmlInterpreter()); 
container.Install(new ContainerInstaller()); 
container.Install(new UnitOfWorkInstaller(AppDomain.CurrentDomain.BaseDirectory, Castle.Core.LifestyleType.Scoped)); 
container.Install(new FactoryInstaller(AppDomain.CurrentDomain.BaseDirectory)); 
container.Install(new RepositoryInstaller(AppDomain.CurrentDomain.BaseDirectory)); 

Configure.With() 
    .CastleWindsorBuilder(container) 
    .FileShareDataBus(Properties.Settings.Default.DataBusFileSharePath) 
    .MsmqTransport() 
     .IsTransactional(true) 
     .PurgeOnStartup(false) 
    .UnicastBus() 
     .LoadMessageHandlers() 
     .ImpersonateSender(false) 
    .JsonSerializer(); 

UnitOfWorkInstaller寄存器工作(NHibernate会话)的单位像这样:

public void Install(IWindsorContainer container, IConfigurationStore store) 
{ 
    var fromAssemblyDescriptor = AllTypes.FromAssemblyInDirectory(new AssemblyFilter(_installationPath)); 
    container.Register(fromAssemblyDescriptor 
     .IncludeNonPublicTypes() 
     .Pick() 
     .If(t => t.GetInterfaces().Any(i => i == typeof(IUnitOfWork)) && t.Namespace.StartsWith("Magma")) 
     .WithService.AllInterfaces() 
     .Configure(con => con.LifeStyle.Is(_lifeStyleType).UsingFactoryMethod(k => k.Resolve<IUnitOfWorkFactory>().Create()))); 
} 

因此,每个时间的消息到达时所有存储库都使用相同的工作单元。我读过手动回滚当前事务结果的错误(我真的不知道为什么),我也知道NSB为每个传输消息创建一个子容器,并且此子容器在处理消息后处理。问题是,当孩子容器布置工作单元配置是这样的:

public void Dispose() 
    { 
     if (!_isDisposed) 
     { 
      DiscardSession(); 
      _isDisposed = true; 
     } 
    } 

    private void DiscardSession() 
    { 
     if (_transaction != null && _transaction.IsActive) 
     { 
      _transaction.Dispose(); 
     } 
     if (Session != null) 
     { 
      Session.Dispose(); 
     } 
    } 

我的处理程序结构是这样的:(该_unitOfWork作为一个构造函数依赖通过)

public void Handle(<MessageType> message) 
    { 
     using (_unitOfWork) 
     { 
      try 
      { 
       // do stuff 
       _unitOfWork.Commit(); 
      } 
      catch (Exception ex) 
      { 
       _unitOfWork.Rollback(); 

       // rethrow so the message stays in the queue 
       throw; 
      } 
     } 
    } 

我发现,如果我没有提交工作单元(刷新会话并提交交易),我得到一个错误,表示邮件已重试超过最大重试次数bla bla bla ...

因此,它似乎与NHibernate会话和它的创建和处理方式,但是由于它是在工作单元内创建的,所以我无法真正使用会话工厂。我读过我可以使用IMessageModule来创建和处理会话,但我不知道这是否正确,因为我不明白是什么导致了错误。

因此,要回顾:

  • 我使用的工作范围的单位,使所有的处理程序相依性将共享相同的实例(THX到子容器,BTW:我设置作为临时思考的工作单元,子容器将所有瞬态对象视为该容器内的单例,但我看到工作单元未被共享,因此这就是为什么它的设置范围)

  • 我是将我的处理程序包装在using(_unitOfWork) { }声明中,以便在每次处理后处理工作单元。

  • 当下班的单位设置,NHibernate会话也设置

  • 如果我不显式调用Commit_unitOfWork,消息重试超出最大重试次数,然后抛出一个错误。

是什么导致了这种行为?而IMessageModule就是这个答案吗?

+0

查看错误队列中的消息。消息头中是否有异常消息? – stephenl 2012-04-12 00:30:01

+0

@stephenl Nope,我看到的唯一信息是与''相同的信息。它似乎与NHibernate有关,但我没有任何与NHibernate的NSB集成。 – 2012-04-12 12:19:44

回答

0

我想我缩小了一下...我删除了所有的using(_unitOfWork)_unitOfWork.Commit()_unitOfWork.Rollback(),并让NSB TransactionScope完成提交或回滚事务的工作,因为NHibernate的会话正在参与NSB事务范围。

我也开始使用NHibernate会话的事务(Session.Transaction),而不是通过Session.BeginTransaction()获取对它的引用并使用它。我复制/粘贴了我的UoW实现,以便您可以看到差异(旧代码在注释中)。

我不知道我的更改是否解释了什么,但使用Session的事务并删除冲洗,因为它正在处理事务中提交似乎解决了问题......我不必显式调用为了使消息被成功处理,使用了Commit方法。这是我的UoW执行:

public class NHibernateUnitOfWork : INHibernateUnitOfWork 
{ 
    //private ITransaction _transaction; 
    private bool _isDisposed; 
    private bool _isInError; 

    public ISession Session { get; protected set; } 

    public NHibernateUnitOfWork(ISession session) 
    { 
     Contract.Requires(session != null, "session"); 
     Session = session; 

     //_transaction = Session.BeginTransaction(); 

     // create a new transaction as soon as the session is available 
     Session.BeginTransaction(); 
     _isDisposed = false; 
     _isInError = false; 
    } 

    public void MarkCreated(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.SaveOrUpdate(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void MarkUpdated(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.Update(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void MarkSavedOrUpdated(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.SaveOrUpdate(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void MarkDeleted(Object entity) 
    { 
     // assert stuff 

     try 
     { 
      Session.Delete(entity); 
      //Session.Flush(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void Commit() 
    { 
     // assert stuff 

     try 
     { 
      //Session.Flush(); 
      //_transaction.Commit(); 
      Session.Transaction.Commit(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void Rollback() 
    { 
     // assert stuff 

     try 
     { 
      //if (!_transaction.WasRolledBack) 
      //{ 
      // _transaction.Rollback(); 
      //} 
      Session.Transaction.Rollback(); 
     } 
     catch (HibernateException ex) 
     { 
      HandleError(); 
      throw; 
     } 
    } 

    public void Dispose() 
    { 
     if (!_isDisposed) 
     { 
      DiscardSession(); 
      _isDisposed = true; 
     } 
    } 

    private void DiscardSession() 
    { 
     //if (_transaction != null && _transaction.IsActive) 
     //{ 
     // _transaction.Dispose(); 
     //} 
     if (Session != null) 
     { 
      try 
      { 
       // rollback all uncommitted changes 
       if (Session.Transaction != null && Session.Transaction.IsActive) 
       { 
        Session.Transaction.Rollback(); 
       } 
       //Session.Clear(); 
       Session.Close(); 
      } 
      catch (Exception) 
      { } 
      finally 
      { 
       Session.Dispose(); 
      } 
     } 
    } 

    private void HandleError() 
    { 
     _isInError = true; 
     //if (_transaction != null && _transaction.IsActive) 
     //{ 
     // _transaction.Rollback(); 
     //} 
     if (Session.Transaction != null && Session.Transaction.IsActive) 
     { 
      Session.Transaction.Rollback(); 
     } 
    } 

    // assert methods 
} 

这是否有任何意义?我仍然不知道是什么造成了错误,但它似乎与事务处理范围完成之前处理NHibernate会话有关。

+0

我只注意到它只在UoW设置为瞬态时才起作用。如果它被设置为作用域,则不起作用。我认为Castle Windsor支持儿童容器,但它似乎不起作用......如果我的处理程序有两个依赖关系,一个是UoW本身,另一个是也对UoW有依赖关系的存储库,他们赢了不一样。 – 2012-04-12 16:02:39