BizTalk 中有序交付的重新排序策略:
我最近回答了一位 LinkedIn 用户关于 BizTalk 中的订购交付选项的问题。
我认为了解使用 BizTalk 对消息重新排序的一些策略对人们很有用。
通常,作为 BizTalk 开发人员,您需要集成到不可更改的业务线系统。这可能是由于许多不同原因中的一个或多个。例如,更改系统的成本可能太高,或者供应商许可声明如果更改系统可能会取消支持。
如果供应商提供了精心设计的 API 作为集成点端点,这通常不会代表问题。然而,正如许多集成开发人员很快了解到的那样,这种情况很少见。
精心设计的 API 是什么意思?好吧,除了所有 SODA 原则(服务组合、故障契约等)之外,API 最重要的特性是支持以错误顺序到达的数据的消费。
这是一件相当简单的事情。例如,如果您是供应商并且您提供 HTTP 操作作为集成点,那么您可以在操作中公开的字段之一是时间戳,或者更好的是序列号。这意味着,如果使用过时的有效负载进行调用,相关的补偿机制可以启动——这可以像丢弃数据一样简单。
本文讨论了当供应商没有将此功能构建到 API 中时该怎么做,因此作为集成商,您被迫将端到端的有序交付作为集成解决方案的一部分来实现。
正如我在 LinkedIn 上对用户帖子的回复中所述(请参阅上面的链接),在 BizTalk 中,除了最简单的场景之外,任何情况下的订单交付充其量都是复杂的,而在最坏的情况下,无论是在开发和支持。根本原因是BizTalk被设计成海量并发来实现高吞吐量,并发和排序之间存在直接且不可避免的冲突。鞋拔 E2E 有序交付到 BizTalk 解决方案依赖于人工制品,例如单件编排,这会引入复杂性并增加故障率和每次故障的成本。
更好的解决方案是将并发处理保持在尽可能接近业务线系统端点的位置,然后在每个端点周围实施所谓的重新排序器包装器,这些端点需要以正确的顺序传递数据.
如何在 BizTalk 中实现这样的包装取决于一些因素,下表概述了这些因素:
|Sequencing |Messages|Database |Wrapper |
|field |are |integration?|strategy |
| |deltas? | | |
|--------------|--------|------------|----------------------------------|
|n of a total m| N | Y |Stored procedure |
|n of a total m| N | N |Singleton orchestration |
|n of a total m| Y | Y |Batched singleton orchestration |
|n of a total m| Y | N |Batched singleton orchestration |
|Timestamp | N | Y |Stored procedure |
|Timestamp | N | N |Singleton orchestration |
|Timestamp | Y | Y |Buffer table with staggered reader|
|Timestamp | Y | N |Buffer table with staggered reader|
第一个因素Sequencing 字段与这样一种想法有关,即为了实现任何类型的重新排序器包装器,您至少需要您的消息数据包含一些排序信息。这可以采用源时间戳的形式;然而,一种更好但更罕见的排序信息由序列号和消息总数组成,例如,10 条消息中的 1 条、10 条中的 2 条等。
第二个因素消息是增量?与您的消息的有效负载是否包含对数据的单个状态更改或对数据的所有过去更改的总和有关。换句话说,是否可以从该消息中重建数据的完整当前状态?如果消息有效负载仅包含单个更改,则可能无法从单个消息重建数据状态,在这种情况下,您的消息是delta。
第三个因素数据库集成?与系统的集成入口点是否是数据库有关。这很重要的原因是,在数据库层集成是一种相当常见的集成场景,如果可用的话,可以大大简化重新排序的处理。
上表中的策略详细描述如下:
存储过程包装器
这是最简单的重排序策略。创建一个新的存储过程,在决定是否更新目标数据之前查询目标数据。决策可以简单到我拥有的数据是否比目标系统中的数据更新?
当然,为了实现这一策略,目标数据还必须包括源数据的排序字段,尽管必要时可以通过依赖目标数据中可能已经存在的现有时间戳来进行近似。存储过程包装器可以包含在目标数据库中,也可以理想地包含在单独的数据库中。
单例编排包装器
该策略背后的思想是单例编排。这是一种您可以实施的模式,以确保在任何时候都只存在一个编排实例。网上有很多文章演示了如何在 BizTalk 中实现这种模式。
这个想法的核心是单例简单地跟踪最近成功处理的消息序列(或时间戳)。如果单例接收到比最近序列更旧的消息,则将其简单地丢弃。这是有效的,因为消息是non-deltas,因此目标系统只能提交许多消息中的最新消息,并且数据将处于最新状态。只有当数据提交成功时,单例持有的最新序列才会更新。
批处理单例编排包装器
该策略基于上面的单例编排包装器,但更复杂。与其只将最新的序列信息保存在内存中,还需要单例在内存中创建和保存一组工作消息,它将重新排序,然后在批处理中的所有预期消息到达后进行处理。这是因为消息是增量,因此目标系统必须按照预期的顺序接收每条消息。一旦批处理成功发送,单例就可以终止。
为此,源数据的必要条件是它包含一些描述的相关标识符,该标识符允许定义批量消息。例如,处理来自客户的一组定义的订单,入站消息必须包含客户的标识符。然后可以使用它来将消息路由到与该客户相关的单例编排实例。此外,可用的消息序列字段必须是总共 m形式的 n。
一旦初始化了单例,它就会在内存中组装一组工作消息,并在新消息到达时继续填充它。我看到的一种方法是使用System.Collections.Generic.List作为工作集的容器。一旦列表已完全填充(列表长度 = m),则假定已接收到批处理中的所有消息,然后编排按顺序循环工作集并将消息处理到目标系统中。
批处理单例编排包装器的好处之一是它允许通过相关标识符进行并发处理。在上面的示例中,这意味着来自两个客户的消息将同时处理。
带有交错读取器包装器的缓冲表
可以说是所提出的最复杂的策略,当您使用基于时间戳的排序字段进行增量消息传递时,可以使用此解决方案。它可以通过某种描述的数据库来实现,该数据库充当重新排序缓冲区。
这里值得注意的是,这种重新排序的包装器并不能保证有序交付,但使用得当它使有序交付的可能性很大。
当消息到达时,它们被写入缓冲区,并且在相同的操作中缓冲区被重新排序,因此缓冲区中保存的消息的顺序总是正确的。
To create the buffer reader, have a receive location which reads the messages in the buffer before passing the messages to a send port with ordered delivery enabled, which then will process the messages into the target system. You can also use a singleton orchestration as an intermediary if your target system's API semantics are too complex for a send port.
However, using this wrapper as I have described it above will not enable ordered delivery, as the messages will almost certainly be committed to the buffer in the wrong order, which will result in the messages being processed into the target system in the same (wrong) order. This is where the staggered query comes in. This is a fancy way of saying your buffer query needs to only select data at intervals of time T, AND only select those rows where the row-number is lower than buffer total row count minus C.
这具有允许在适当的时间跨度内进行排序的效果。大多数 BizTalk 开发人员都熟悉T作为某些适配器(例如WCF-SQL适配器)的轮询间隔。C稍微难以设置,但是通过增加这个数字,您可以减少在轮询时错过比检索数据集中的最新消息更早的消息的机会。
What T and C are depends on many things, although these values should be based on your latency SLA and your message volume (or throughput). As a guideline, if you have a SLA to deliver data into your target system within 30 seconds and you process 10 messages per second then T should be around 10 seconds and C should be around 100 rows.
Of course this only works if your messages for a given correlation id are sent by the source system during a short space of time (ideally back-to-back). The longer the interval between sends, the greater the required value of C, and the less effective the wrapper becomes.
One of the benefits of this strategy is you can also perform de-duplication of messages in the buffer if your data source is prone to sending duplicate messages and your target system endpoint is not idempotent. You can also use the buffer to implement FILO and other non-standard queueing semantics.
Conclusions
The strategies I have discussed here are ways of bending BizTalk to a task which is wasn't designed to do. As a result each has caveats around cost and complexity to support, and also may not work in certain scenarios. I would like to hear from anyone who has implemented other patterns for ordered delivery in BizTalk.