阅读 IntroToRx 网站,它不鼓励使用 Subject 来支持 Observable.Create 辅助方法。
正如我所看到的,OnNext 方法只能在 subscribe 方法中调用,因为它是我唯一可以访问 Observer 对象的部分。
如果我想在创建新值后推动它怎么办?我是否“被迫”使用主题?
阅读 IntroToRx 网站,它不鼓励使用 Subject 来支持 Observable.Create 辅助方法。
正如我所看到的,OnNext 方法只能在 subscribe 方法中调用,因为它是我唯一可以访问 Observer 对象的部分。
如果我想在创建新值后推动它怎么办?我是否“被迫”使用主题?
如果你只是在探索 Rx,那就去做吧——使用 Subjects,发疯,看看它们是如何工作的,自己发现它们的优缺点,然后回到这里阅读讨论为什么 Subject 不受欢迎的问题。
主题提供了一种更简单的方式来“快速引导”想法和复杂的 Rx 场景,而无需复制实际的源条件。
也就是说,它们确实将状态注入到某种无状态的操作链中,因此请注意不要依赖它们。
所以,总结一下:如果你想生成序列来测试/学习 rx 是如何工作的,或者你如何进行查询 X,请使用这些主题。如果您发现自己在内部查询中使用它们,那么有机会有更好的方法。
编辑:意识到我错过了一些东西:
另外,您问是否有另一种方法可以在创建后引发流事件……答案是肯定的;您可以通过 Create 或 Return 或 Generate 声明一个流,该流返回您定义的任何旧的基于 IObservable 的对象,它还可以公开注入事件的方法......或者哎呀,有一个 lambda 来旋转一个检查共享列表的线程被路由到返回流......我想我要说的是可能性是无穷无尽的。在 Observable 上声明了十几个“创建事件序列”方法 - 全部尝试!
编辑2:
一个例子?当然,让我们一起使用Observable.Create
模仿非常低效的东西Subject
:
var running = true;
var values = new ConcurrentQueue<int>();
var query = Observable.Create<int>(obs =>
{
var body = Task.Factory.StartNew(()=>
{
while(running)
{
int nextValue;
if(values.TryDequeue(out nextValue))
{
obs.OnNext(nextValue);
}
Thread.Yield();
}
});
return Disposable.Create(() =>
{
try
{
running = false;
body.Wait();
obs.OnCompleted();
}
catch(Exception ex)
{
obs.OnError(ex);
}
});
});
using(query.Subscribe(Console.WriteLine))
{
values.Enqueue(1);
values.Enqueue(2);
values.Enqueue(3);
values.Enqueue(4);
Console.ReadLine();
}
请注意,这只是快速且极其肮脏的示例代码。:)
这取决于你想要做什么。Subjects 有很多案例,但并不像他们第一次使用 Rx 时想象的那么多。
新数据将如何进入您的序列?它会来自另一个事件吗?也许是来自通信框架的消息?也许轮询文件?
根据这些答案,您通常会发现您已经拥有某种事件源,而您只是从另一种模式(事件、轮询、回调等)转换为 Rx
您也不必只使用 Observable.Create。您可以使用 Observable.Timer/Interval 设置轮询序列,使用 Observable.FromEventPattern 来利用现有事件,使用 Observable.Start 进行一次性异步任务样式计算等...
由于 Rx(甚至 Linq)可能非常抽象,因此提出抽象问题通常会导致非常广泛的答案。如果您指出您正在尝试解决的问题,这可能有助于为您提供更好的答案。
如果您从外部设备接收数据,您无意发出错误信号IObserver.OnError
(您假设您的流是无止境的和/或任何通信问题都在消息本身内),您以某种速度轮询,主题的唯一问题是你可能会在任何人订阅之前就开始轮询这个设备(但额外的好处是处理你的状态非常明显,你创建了一个对象,它打开了 COM 端口,它通信和发布值)
使用Observable.Create
orObservable.Timer/Interval
可能会更好 - 但懒惰是主要原因,无论如何你都会管理状态。您可能需要使用 Publish().RefCount() 来防止第二次订阅打开端口。