问题标签 [nservicebus-sagas]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
245 浏览

c# - 使用 NServicebus saga 序列化长时间运行的端点处理程序的执行

我们正在尝试使用 Saga 序列化业务对象列表的处理。

现在,在没有 Saga 的情况下,我们只需遍历对象列表,然后触发bus.Send(new ProcessBusinessObejct(obj))async 以执行处理程序。所以处理或多或少是并行发生的,取决于这个设置,我相信:

这工作正常,但并发处理程序的数量现在很难在数据库上。

可以串联触发这些处理程序,即仅在当前进程完成(失败或成功)时继续下一个处理程序。我们不想将并发设置为 1,它会影响端点中的所有处理程序。

这个想法是使用 Scatter/Gather 模式和 Saga 来跟踪对象的数量并使用计数(总计数、失败计数、成功计数)更新状态机,最后在列表出现时触发事件完成/空。

问题是

A)我不确定如何跟踪传奇中的列表。SagaData 需要一个列表来保存所有对象吗?然后在处理程序发出信号完成处理时删除一个实例。 saga 不支持分层数据,因此不支持 List 或 List。我相信在 NSB v7 中仍然是这种情况。

B) 使用 saga 是可行的还是矫枉过正,还是有更简单的方法来实现这一点?

我们正在使用 Sql Server 持久性和传输以及 NSB 7。

非常感谢任何输入!

0 投票
1 回答
54 浏览

nservicebus - 使用具有可恢复性的 Sagas

我们在恢复源自 Sagas 的消息时遇到问题。

当 Saga 发送消息进行处理时,消息处理程序有时会因异常而失败。我们目前使用 try/catch,当抛出异常时,我们“回复”一个失败的消息给 Saga。这种方法的问题是不会发生可恢复性重试,因为我们正在处理消息处理程序中的错误。

我的想法是向管道添加自定义逻辑,如果命令消息实现了一些特殊的接口,如果发生异常(重试失败后),自定义逻辑将向 Saga 发送失败的消息响应,但我不确定在哪里插入允许我在重试失败后发送消息的管道。

这是一种有效的方法吗?如果没有,重试后如何解决 Saga to Handler 失败消息?

0 投票
0 回答
171 浏览

.net - NServiceBus - 无法加载文件或程序集“”或其依赖项之一

我正在使用一个使用 NServiceBus (v4.7.0) 队列中的消息的 .Net Web 应用程序。我收到以下异常:

2019-02-20 15:35:13,624 [28] 错误 NServiceBus.Unicast.Transport.TransportReceiver [(null)] - [(null)] - 无法反序列化 ID 为 0942d9eb-126e-4af6-b860-a9f400a3be82 系统的消息.Runtime.Serialization.SerializationException:尝试从传输消息 NServiceBus.TransportMessage ---> System.IO.FileLoadException 提取逻辑消息时发生错误:无法加载文件或程序集'xpto.Contracts,版本 = 1.0.0.0,文化=neutral, PublicKeyToken=6a17a7d57c13b3cf' 或其依赖项之一。找到的程序集的清单定义与程序集引用不匹配。(来自 HRESULT 的异常:0x80131040)---> System.IO.FileLoadException:定位的程序集的清单定义与程序集引用不匹配。(HRESULT 的例外情况:
在 System.RuntimeTypeHandle.GetTypeByName(字符串名称,布尔 throwOnError,布尔 ignoreCase,布尔反射仅,StackCrawlMarkHandle stackMark,IntPtr pPrivHostBinder,布尔 loadTypeFromPartialName,ObjectHandleOnStack 类型)在 System.RuntimeTypeHandle.GetTypeByName(字符串名称,布尔 throwOnError,布尔 ignoreCase,布尔反射, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean loadTypeFromPartialName) at System.RuntimeType.GetType(String typeName, Boolean throwOnError, Boolean ignoreCase, Boolean reflectOnly, StackCrawlMark& stackMark) 在 System.Type.GetType(String typeName, Boolean throwOnError) 在 NServiceBus.Unicast.Messages c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus 中的 .MessageMetadataRegistry.GetMessageMetadata(String messageTypeIdentifier)。Core\Unicast\Messages\MessageMetadataRegistry.cs:C:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Messages\ExtractLogicalMessagesBehavior.cs 中 NServiceBus.Unicast.Messages.ExtractLogicalMessagesBehavior.Extract(TransportMessage physicalMessage) 的第 52 行: C:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Messages\ExtractLogicalMessagesBehavior.cs:line 52 中 NServiceBus.Unicast.Messages.ExtractLogicalMessagesBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) 的第 75 行 --- 结束内部异常堆栈跟踪 --- 在 C:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Messages\ExtractLogicalMessagesBehavior.cs 中的 NServiceBus.Unicast.Messages.ExtractLogicalMessagesBehavior.Invoke(ReceivePhysicalMessageContext context, Action next):NServiceBus.Pipeline.BehaviorChain 的第 56 行1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 35 at NServiceBus.Sagas.RemoveIncomingHeadersBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Sagas\RemoveIncomingHeadersBehavior.cs:line 23 at NServiceBus.Pipeline.BehaviorChain1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:第 35 行 NServiceBus.Unicast.Behaviors.RaiseMessageReceivedBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\ BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\RaiseMessageReceivedBehavior.cs:第 18 行在 NServiceBus.Pipeline.BehaviorChain 1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 35 at NServiceBus.MessageMutator.ApplyIncomingTransportMessageMutatorsBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\MessageMutator\ApplyIncomingTransportMessageMutatorsBehavior.cs:line 22 at NServiceBus.Pipeline.BehaviorChain1.Invoke() 在 c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline \BehaviorChain.cs:第 35 行 NServiceBus.UnitOfWork.UnitOfWorkBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) 在 c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\UnitOfWork\UnitOfWorkBehavior.cs:第 34 行 NServiceBus.Pipeline .行为链1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 35 at NServiceBus.Unicast.Behaviors.ForwardBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\ForwardBehavior.cs:line 24 at NServiceBus.Pipeline.BehaviorChain1.Invoke() 在 c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:第 35 行 NServiceBus.Audit.AuditBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\ work\1b05a2fea6e4cd32\src\NServiceBus.Core\Audit\AuditBehavior.cs:第 17 行 NServiceBus.Pipeline.BehaviorChain 1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 35 at NServiceBus.Unicast.Behaviors.ImpersonateSenderBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\ImpersonateSenderBehavior.cs:line 37 at NServiceBus.Pipeline.BehaviorChain1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs :NServiceBus.Unicast.Behaviors.MessageHandlingLoggingBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\MessageHandlingLoggingBehavior.cs 的第 35 行:NServiceBus.Pipeline 的第 21 行.行为链1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 35 at NServiceBus.Unicast.Behaviors.ChildContainerBehavior.Invoke(ReceivePhysicalMessageContext context, Action next) in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Behaviors\ChildContainerBehavior.cs:line 17 at NServiceBus.Pipeline.BehaviorChain1.Invoke() in c:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Pipeline\BehaviorChain.cs: 第 35 行 NServiceBus.Pipeline.PipelineExecutor.InvokeReceivePhysicalMessagePipeline() 在 c:\BuildAgent\work\1b05a2fea6e4cd32\src \NServiceBus.Core\Pipeline\PipelineExecutor.cs:C:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\UnicastBus.cs 中 NServiceBus.Unicast.UnicastBus.TransportMessageReceived(Object sender, TransportMessageReceivedEventArgs e) 的第 44 行: C:\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Transport\TransportReceiver.cs 中 NServiceBus.Unicast.Transport.TransportReceiver.OnTransportMessageReceived(TransportMessage msg) 的第 1099 行:NServiceBus.Unicast.Transport.TransportReceiver 的第 439 行c中的.ProcessMessage(TransportMessage消息):\BuildAgent\work\1b05a2fea6e4cd32\src\NServiceBus.Core\Unicast\Transport\TransportReceiver.cs:356 行

有趣的是,我更改了“xpto.Contracts”签名,它不再具有 PublicKeyToken=6a17a7d57c13b3cf',找不到对该密钥令牌的任何引用。这是从哪里来的?

0 投票
1 回答
150 浏览

nservicebus - NServiceBus Sagas 中使用了哪些事务?

我使用 Azure 表存储进行持久化,使用 Azure 服务总线进行传输,我想知道 saga 的处理程序中有哪些事务?它和普通的处理程序一样吗?

我之所以问,是因为我看到数据库更改(通常在环境事务中登记的 SqlBulkCopy)发生多次。在这种情况下,我直接从 Saga 访问数据库以“单线程”处理消息,但它似乎不起作用。

0 投票
1 回答
261 浏览

nservicebus - 将 NServiceBus Saga 与 ConversationId 相关联

我目前有一些 sagas 在基本消息类上传递自定义 CorrelationId。所有事件和命令都继承自该基类,因此可以轻松访问 CorrelationId。结果,ConfigureHowToFindSaga映射看起来像这样:

在 saga 的每个步骤(以及 saga 与之通信的其他服务)中,自定义 CorrelationId 当前手动从消息映射到消息,如下所示:

我想知道是否可以使用 NSBConversationId消息头来替换自定义 CorrelationId,因为我相信 NSB 已经自动将其从消息映射到消息。

我对此有两个问题:
1. 这听起来合理吗?
2. 如果是这样,是否有一种简单的方法设置ConfigureMapping映射,以便 saga 从context.MessageHeaders[Headers.ConversationId]消息正文中的属性读取而不是直接从属性读取?我意识到我可以编写一个自定义 SagaFinder 来挖掘标题并获取 ConversationId 并使用它来查找 saga。但是,我认为如果我需要为每个 saga 都这样做,那么整个方法可能是错误的。谢谢

0 投票
1 回答
132 浏览

sql-server - NServiceBus 不会自动创建架构 (SQL Server)

我希望我的 NServiceBus 传奇存在于单独的 SQL Server 架构中。为 sagas 创建表的自动生成的 SQL 脚本确实使用自定义模式名称,但如果模式不存在则不创建模式,因此必须事先手动创建模式。

这种行为是设计使然吗?在执行自动生成的 SQL 之前,是否有一个钩子可以用来包含创建模式的自定义代码?

配置如下所示:

0 投票
0 回答
135 浏览

c# - 处理程序多次执行问题

Nservicebus 处理程序面临的问题。处理程序执行多次。请在下面找到代码。运行此代码后面临问题,letthandler 类中的处理程序被多次执行。

Lett 处理程序类

行为类

处理程序连接器类

似乎在执行代码之后
等待 context.Publish(context.Message.Instance, this.RetrieveOptions(context)).ConfigureAwait(false); 在 Behavior 类中,lett 处理程序句柄被执行,并且该循环将继续。不确定是什么问题。最近项目参考从 4 更新到 7.2 Nservicebus。有什么建议么?

0 投票
1 回答
158 浏览

nservicebus - 如何在负载下提高我的 NServiceBus Saga 的性能

我有一个使用 SQL 传输和 NHibernate 持久性使用 NSB7 构建的非常简单的 Saga。

Saga 侦听一个队列,接收到的每条消息都通过 4 个处理程序运行。它们按顺序调用,2 个处理程序并行运行,最后一个处理程序仅在两个并行处理程序完成后运行。最后一个处理程序将记录写入 DB

假设对于一条消息,每个处理程序需要 1 秒。当收到一条启动 Saga 的新消息时,预期结果是 3-4 秒后记录被写入数据库。

如果队列备份了 1000 条消息,一旦它们再次开始处理,则在最后一个处理程序中创建新记录之前需要将近 2000 秒。基本上,它们不是为每条消息运行预期的 4 秒处理时间,而是有效地聚集在初始处理程序中,直到队列被清空,然后为下一个处理程序再次执行此操作,并不断重复。

关于在负载下如何提高该系统的性能的任何想法,以便源源不断的已处理消息流出来,而不是在一条新记录出现在另一端之前的一堆消息和长时间延迟?

谢谢威尔

0 投票
1 回答
64 浏览

nservicebus - 用于处理来自公共共享服务的事件的 NServiceBus 模式

我们有一种情况,我们的一些服务在我们的系统中共享。例如,跟踪股票走势的一种。每当文章的库存水平发生变化时,就会引发一个事件。

我们遇到的问题是,虽然有时另一个服务可能对所有库存变化事件感兴趣(例如进行一些聚合),但在大多数情况下,只有特定操作导致的库存变化才是有趣的。

我们现在面临的问题是这样的。假设有一个 IArticleStockChangedEvent 事件,其中包含文章编号、库存更改和请求更改的 ProcessId。每次更改文章库存都会引发此事件。

现在一些外部服务有一个 saga 来改变 10 篇文章,并命令 stock 服务来做到这一点。它还实现了 IHandleMessages 来跟踪进度。这在理论上很有效,但实际上这意味着包含此 saga 的服务将被不相关的 IArticleStockChangedEvent 消息淹没,它将无法找到相应的 saga 实例。虽然在技术上没有破坏任何东西,但它会导致系统不必要的延迟。

我并不真的期待为每个可能导致股票变化的传奇创建一种新的 IArticleStockChangedEvent。处理此问题的推荐方法是什么?

谢谢

0 投票
2 回答
277 浏览

nservicebus - 如何查询存储在 SQL Persistence 表中的 Sagas

我需要查询 Saga Data 类的属性以获取列表。它作为序列化对象存储在 SqlPersistance 表 [Data] 列中。想想我的 SagaData 有一个名为 UserName 的属性的场景,所以我想查询与该用户相关的每个 saga。以一种草率的方式,我可以查询列内容,获取列表,并可以从内容中创建 Saga 对象,通过如下查询:

但我正在寻找一种优雅的方法,可能使用 NserviceBus API。ParticularSoftware 文档(链接: https ://docs.particular.net/persistence/sql/saga-finder)中描述了一个 SagaFinder 实现,但这仅返回一个不完全适合我的场景的对象。

这里是如何在文档中实现的:

任何想法表示赞赏。谢谢!