2

我有一个跟踪状态的 Observable,有点像状态机。

我有一个 Observable 的订阅者,它检测事物何时处于特定状态,它需要将事物推入新状态,有效地调用 Observable 上的 OnNext()!

这违反了一次只有一个 OnNext() 正在运行的 Rx 原则!有什么好的解决方法吗?就像安排 OnNext() 在当前完成传播之后直接发生一样?

4

2 回答 2

2

在控制系统理论中,这种类型的系统是具有反馈的控制回路。它抵制传统分析,因为系统的输入取决于输出——这可能是一个连续函数。

可以对 Observables(本质上是离散的值流)做同样的事情,即以声明方式表达控制系统。但是,C# 语义使实现不是很优雅。(F# 可以选择递归绑定)。

首先,我们需要定义一个闭包来编写反馈循环。

    public delegate IObservable<T> Feedback<T>(IObservable<T> feedback, out IObservable<T> output);
    public static IObservable<T> FeedbackSystem<T>(Feedback<T> closure)
    {
        IObservable<T> source = Observable.Empty<T>(), output;
        source = closure(Observable.Defer(() => source), out output);
        return output;
    }

使用上述内容,这里是一个速度调节器的示例实现,它加速到 100,并保持低于它的速度,即使在速度中引入了随机误差。

    var system =
    FeedbackSystem((IObservable<double> acceleration, out IObservable<double> velocity) =>
    {
        //Time axis: moves forward every 0.1s
        double t = 0.1; var timeaxis = Observable.Interval(TimeSpan.FromSeconds(t));

        velocity = acceleration.Sample(timeaxis)                //move in time
                               .Scan((u, a) => u + a * t)   //u' = u + at
                               .Select(u => u + new Random().Next(10))  //some variations in speed
                               .Publish().RefCount();                   //avoid recalculation

        //negative feedback
        var feedback = velocity.Select(u => 0.5 * (100 - u));

        return feedback.Select(a => Math.Min(a, 15.0))  //new limited acceleration
                       .StartWith(0);                   //initial value          
    });

    system.Subscribe(Console.WriteLine);

通过以相同的方式使输入依赖于输出,状态机的情况是可能的。

于 2013-02-01T00:20:09.480 回答
0

顺便说一句,我会说您可能“想做其他事情” - 也就是说,在可观察对象中有另一个注入点,比如包含类等。有一个方法PushState(...)

无论如何,你可以保留“我会告诉你下一步是什么”的语义,方法是IObservable用 a支持Subject<T>或者在创建过程中指定你的注入路线Observable.Create

(虽然Subject<T>被很多人看不起,因为它“不纯”)

于 2013-01-30T18:32:09.340 回答