我有一个跟踪状态的 Observable,有点像状态机。
我有一个 Observable 的订阅者,它检测事物何时处于特定状态,它需要将事物推入新状态,有效地调用 Observable 上的 OnNext()!
这违反了一次只有一个 OnNext() 正在运行的 Rx 原则!有什么好的解决方法吗?就像安排 OnNext() 在当前完成传播之后直接发生一样?
我有一个跟踪状态的 Observable,有点像状态机。
我有一个 Observable 的订阅者,它检测事物何时处于特定状态,它需要将事物推入新状态,有效地调用 Observable 上的 OnNext()!
这违反了一次只有一个 OnNext() 正在运行的 Rx 原则!有什么好的解决方法吗?就像安排 OnNext() 在当前完成传播之后直接发生一样?
在控制系统理论中,这种类型的系统是具有反馈的控制回路。它抵制传统分析,因为系统的输入取决于输出——这可能是一个连续函数。
可以对 Observables(本质上是离散的值流)做同样的事情,即以声明方式表达控制系统。但是,C# 语义使实现不是很优雅。(F# 可以选择递归绑定)。
首先,我们需要定义一个闭包来编写反馈循环。
public delegate IObservable<T> Feedback<T>(IObservable<T> feedback, out IObservable<T> output);
public static IObservable<T> FeedbackSystem<T>(Feedback<T> closure)
{
IObservable<T> source = Observable.Empty<T>(), output;
source = closure(Observable.Defer(() => source), out output);
return output;
}
使用上述内容,这里是一个速度调节器的示例实现,它加速到 100,并保持低于它的速度,即使在速度中引入了随机误差。
var system =
FeedbackSystem((IObservable<double> acceleration, out IObservable<double> velocity) =>
{
//Time axis: moves forward every 0.1s
double t = 0.1; var timeaxis = Observable.Interval(TimeSpan.FromSeconds(t));
velocity = acceleration.Sample(timeaxis) //move in time
.Scan((u, a) => u + a * t) //u' = u + at
.Select(u => u + new Random().Next(10)) //some variations in speed
.Publish().RefCount(); //avoid recalculation
//negative feedback
var feedback = velocity.Select(u => 0.5 * (100 - u));
return feedback.Select(a => Math.Min(a, 15.0)) //new limited acceleration
.StartWith(0); //initial value
});
system.Subscribe(Console.WriteLine);
通过以相同的方式使输入依赖于输出,状态机的情况是可能的。
顺便说一句,我会说您可能“想做其他事情” - 也就是说,在可观察对象中有另一个注入点,比如包含类等。有一个方法PushState(...)
无论如何,你可以保留“我会告诉你下一步是什么”的语义,方法是IObservable
用 a支持Subject<T>
或者在创建过程中指定你的注入路线Observable.Create
(虽然Subject<T>
被很多人看不起,因为它“不纯”)