我正在编写一个消息代理,它可以利用文件系统上的 IBM MQ 和文件夹。在获取消息之后,它将它们具体化为强类型类并将它们插入到 RX 主题中。
我已经对消息建立了意识,使我能够识别需要命中哪些外部系统来处理它们,因此我可以对 RX observables 进行查询并选择不针对外部系统的消息等。
我接下来要做的是通过被击中的外部系统来限制消息,例如:
如果我正在使用某种类型的消息访问 CRM 系统,并且我决定要使用最多 4 个并发呼叫来访问该系统,我将一次只处理 4 条消息,如果我有第 5 条消息,我会必须等待前 4 个中的一个完成,然后再进行第 5 个。对于外部数据库、其他外部 Web 服务等其他类型的资源也是如此。
我已经开始研究这个问题,到目前为止最好的设计方法是编写自己的调度程序。缺点是我必须编写自己的内部结构,以便在消息被接收后在调度程序中排队,这就是我不喜欢这种方法的地方。
有没有人有更好的方法来做到这一点?