1

使用的 NServiceBus 版本:2.0.0.1145

问题:

是否可以配置 NServiceBus 主机,使其消费(订阅)他自己发布的消息?

回答:

这似乎是可能的,但在下面的配置中,它在尝试将订阅插入订阅存储时给了我一个事务死锁异常。当您使用 DbSubscriptionStorage 和超过 1 个“NumberOfWorkerThreads”时会发生这种情况。

错误:

Could not execute command:
INSERT INTO Subscription (SubscriberEndpoint, MessageType) VALUES (@p0, @p1)
System.Data.SqlClinet.SqlException:
Transaction was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

之后,NServiceBus 尝试断开连接但失败,因为事务仍在进行中并引发 UnhandledException。

如何重现:

这是我的 App.Config:

<!-- Publishing Configuration -->
<MsmqTransportConfig InputQueue="test_publisher_output" ErrorQueue="test_error" NumberOfWorkerThreads="3" MaxRetries="5" />

<!-- Subscription Configuration -->
<UnicastBusConfig DistributorControlAddress="" DistributorDataAddress="" ForwardReceivedMessagesTo="">
    <MessageEndpointMappings>
        <add Messages="MessageAssembly" Endpoint="test_publisher_output" />
    </MessageEndpointMappings>
</UnicastBusConfig>

我的总线配置:

var bus = Configure.With()
    .Log4Net()
    .StructureMapBuilder(container)
    .XmlSerializer()
    .MsmqTransport()
        .IsTransactional(true)
        .PurgeOnStartup(false)
    .DBSubcriptionStorage(subscriptionDbProperties, true)
    .Sagas()
    .NHibernateSagaPersister(sagaDbProperties, true)
    .UnicastBus()
        .ImpersonateSender(false)
        .LoadMessageHandlers(First<GridInterceptingMessageHandler>
            .Then<SagaMessageHandler>())
    .CreateBus()
    .Start();

这是我的订阅和传奇数据库的 dbProperties:

connection.provider      NHibernate.Connection.DriverConnectionProvider
connection.driver_class  NHibernate.Driver.SqlClientDriver
dialect                  NHibernate.Dialect.MsSql2005Dialect

只要我不将 NumberOfWorkerThreads 增加到 1 以上,一切都可以正常工作。高于此的所有内容都会引发上述错误。

我希望我没有忘记任何事情。提前感谢您的帮助。

4

2 回答 2

1

如果您希望相同的进程来处理已发布的消息,最好在 Bus.Publish() 之后执行 Bus.SendLocal()。SendLocal() 方法将在本地队列上放置一条消息,您的内部处理程序将拾取并处理它。这将摆脱你的死锁,但保持相同的语义。

于 2010-10-15T17:12:41.207 回答
-1

我真的会考虑重新设计这个组件。如果您想要 nservicebus 为您提供的稳定性,并且您已经分解了组件,因此处理的每个部分都在单独的消息处理程序中,请将每个消息处理程序放在具有单独队列的单独可执行文件中。如果这不可能,那么您还没有真正获得 nservicebus 的稳定性,因为您被其他一些代码锁定,在这种情况下,您应该直接调用所需的函数。

如果您只是在一个队列中测试它们,那么在您测试时也将它们分开。确实没有理由订阅您自己的消息 - 如果可能,将处理程序拆分为单独的端点,如果不可能,则直接调用函数。

于 2010-10-25T10:55:28.753 回答