1

我正在编写一个消息代理,它可以利用文件系统上的 IBM MQ 和文件夹。在获取消息之后,它将它们具体化为强类型类并将它们插入到 RX 主题中。

我已经对消息建立了意识,使我能够识别需要命中哪些外部系统来处理它们,因此我可以对 RX observables 进行查询并选择不针对外部系统的消息等。

我接下来要做的是通过被击中的外部系统来限制消息,例如:

如果我正在使用某种类型的消息访问 CRM 系统,并且我决定要使用最多 4 个并发呼叫来访问该系统,我将一次只处理 4 条消息,如果我有第 5 条消息,我会必须等待前 4 个中的一个完成,然后再进行第 5 个。对于外部数据库、其他外部 Web 服务等其他类型的资源也是如此。

我已经开始研究这个问题,到目前为止最好的设计方法是编写自己的调度程序。缺点是我必须编写自己的内部结构,以便在消息被接收后在调度程序中排队,这就是我不喜欢这种方法的地方。

有没有人有更好的方法来做到这一点?

4

3 回答 3

1

您所描述的似乎是最大并发。运营商支持这样的Merge东西。

您需要使用类似的东西GroupBy来根据流的去向拆分流,然后Merge在每个拆分片段上使用最大并发,最后Merge将结果重新组合在一起。像这样的东西:

IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
    .Select(group => group // group.Key is the TExternalSystem
        .Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
        .Merge(maxConcurrency: group.Key.MaxConcurrency))
    .Merge() // merge the results of each group back together again
    .Subscribe(result => ...);
于 2013-08-17T22:06:57.843 回答
0

您可能需要查看 ReactiveUI,其中包含服务请求的速率限制机制。见http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/

于 2013-08-16T14:42:26.200 回答
0

我还在MSDN 上发布了同样的问题,并通过 Merge 运算符的不同实现得到了更深入的答案,这样当最大并发值发生变化时就不会发生数据丢失。

于 2013-08-20T09:20:33.257 回答