1

我计划在某个时候在我的一个项目中使用 Rx,并且我一直在研究我可以用 Rx 做什么。

我的项目使用 TPL 并行处理状态机转换(利用所有可用的处理器内核)。但是,为了提高性能,我想用 Rx 推送机制替换当前的 IList<> 拉取机制。

由于我对这项技术不太了解,我想确定 Rx 是否与将令牌推送到并行状态转换兼容。我将要求所有并行状态转换都订阅 Rx 主题并检索下一个令牌。每个状态转换都需要来自主题的相同标记。根据我自己的研究了解到,一旦令牌被推送给观察者(在这种情况下为状态转换),该令牌就永远消失了。如果是这种情况,其他转换(对于相同的状态)将永远不会收到此令牌并最终处于错误状态。

有人能告诉我我的担忧是否正确吗?我没有要显示的代码,因为我只是在探索我的选择。

谢谢你。

4

3 回答 3

1

将 Rx Subject 视为类似于 .NET 事件可能会有所帮助,即已注册事件处理程序的线程安全列表。在 Subject 的情况下,这些可以被认为等同于单个 Observer OnNext 代表。一旦事件触发,每个处理程序都会使用相同的事件参数调用。同样,订阅 Subject 的每个 Observer 都会使用相同的参数对象调用 OnNext。此参数可能包含状态转换所需的令牌的值类型或引用类型。大概您的 Observer 的 OnNext 处理程序将存储此令牌(例如在并发队列中)以用于相应的并行任务消耗。

因此,除非您简单地忽略 OnNext 调用或丢弃它们传递给您的对象,否则没有什么是永远的。

如果您的任务之一更改了令牌,您可能已经共享状态损坏问题。或者你可能有同步问题,如果一个任务处理了几个状态转换令牌,而另一个任务甚至没有时间出列第一个任务。但这并不特定于使用 Rx 主题,而不是其他一些技术来推送令牌。

于 2012-11-23T07:47:48.177 回答
0

您可能正在寻找

IConnectableObservable<T> IObservable<T>.Publish()

可连接的 observable 是原始 observable 的多播变体,可以多次订阅。首先,您将所有观察者订阅到可连接的可观察对象,然后在可连接的可观察对象上调用连接。

于 2012-11-21T08:56:28.983 回答
-1

@Tony 提出了一个很好的观点,即 Observable 序列(IObservable)就像一个事件,因为它可以有一个线程安全的注册订阅者/事件处理程序列表。因此,与事件一样,许多注册订阅者可以接收事件有效负载(以串行方式,首先调用第一个订阅者)。

然而,没有解决的是一个潜在的问题,即 Rx 可能会帮助您处理那些事件也不会借给自己的时间。例如,您可能在设置所有并行状态转换处理程序之前发布/推送了令牌。如果您使用事件或标准可观察序列,则有效负载将丢失。您可以通过使用 Rx 中的缓存功能(例如 Replay)来解决此问题。这可以让您将最后一个值重播给迟到的订阅者。

如果这很有趣,那里有大量信息。你可以在我的书的部分阅读更多关于它的信息:http: //introtorx.com/Content/v1.0.10621.0/02_KeyTypes.html

于 2012-12-10T08:17:43.977 回答