4

我正在各种平台上尝试响应式扩展,而让我有点恼火的一件事是故障。

即使对于 UI 代码,这些故障可能不是那么成问题,并且通常可以找到解决它们的操作符,但我仍然发现在存在故障的情况下调试代码更加困难:中间结果对于调试并不重要,但我的想法不知道结果何时是中间或“最终”。

在 Haskell 和同步数据流系统中使用纯功能 FRP 进行了一些工作后,它也“感觉”错了,但这当然是主观的。

但是当将 RX 连接到非 UI 执行器(如电机或开关)时,我认为故障问题更大。如何确保只有正确的值被发送到外部执行器?

也许这可以通过一些知道一些“外部传感器”何时触发启动事件的“调度程序”来解决,以便在将最终结果转发给执行器之前处理所有内部事件。类似于flajax论文中描述的东西。

我希望得到答案的问题是:

  1. RX 中是否存在无法修复同步通知故障的问题?
  2. 如果没有,RX 是否存在(最好是生产质量)库或方法来修复同步故障?特别是对于单线程 Javascript,这可能有意义吗?
  3. 如果不存在通用解决方案,如何使用 RX 来控制外部传感器/执行器而不会在执行器上出现故障?

让我举个例子

假设我想打印(a,b)合同所在的元组序列

a=n    b=10 * floor(n/10)

n 是自然数流 = 0,1,2....

所以我期望以下序列

(a=0, b=0)
(a=1, b=0)
(a=2, b=0)
...
(a=9, b=0)
(a=10, b=10)
(a=11, b=10)
...

在 RX 中,为了让事情更有趣,我将使用过滤器来计算 b 流

var n = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Publish()
        .RefCount();

var a = n.Select(t => "a=" + t);

var b = n.Where(t => t % 10 == 0)
        .Select(t => "b=" + t);

var ab = a.CombineLatest(b, Tuple.Create);

ab.Subscribe(Console.WriteLine);

这给出了我认为是一个小故障(暂时违反不变量/合同):

(a=0, b=0)
(a=1, b=0)
(a=2, b=0)
...
(a=10, b=0) <-- glitch?
(a=10, b=10)
(a=11, b=10)

我意识到这是 CombineLatest 的正确行为,但我也认为这被称为故障,因为在真正的纯 FRP 系统中,您不会得到这些违反中间不变性的结果。

请注意,在此示例中,我将无法使用 Zip,而且 WithLatestFrom 也会给出不正确的结果。

当然,我可以将这个例子简化为一个单子计算,从不多播 n 个流事件(这意味着不能过滤而只能映射),但这不是重点:在 RX 中的 IMO 你总是得到一个“故障” ' 每当您拆分并重新加入可观察流时:

    s
   / \
  a   b
   \ /
    t

例如,在 FlapJAX 中,您不会遇到这些问题。

这有什么意义吗?

非常感谢,彼得

4

1 回答 1

2

更新:让我尝试在 RX 上下文中回答我自己的问题。

首先,我对什么是“故障”的理解似乎是错误的。从纯 FRP 的角度来看,在我看来 RX 中的故障似乎实际上是 RX 中的正确行为。

所以我猜想,在 RX 中,我们需要明确说明我们期望驱动来自传感器的组合值的“时间”。

在我自己的示例中,执行器是控制台,传感器是间隔n

所以如果我改变我的代码

ab.Subscribe(Console.WriteLine);

进入

ab.Sample(n).Subscribe(Console.WriteLine);

然后只打印“正确”的值。

这确实意味着,当我们得到一个组合来自传感器的值的可观察序列时,我们必须知道所有原始传感器,将它们全部合并,并在将任何值发送到执行器之前使用合并后的信号对值进行采样......

因此,另一种方法是将 IObservable “提升”为“感知”结构,该结构可以记住并合并原始传感器,例如:

public struct Sensed<T>
{
    public IObservable<T> Values;
    public IObservable<Unit> Sensors;

    public Sensed(IObservable<T> values, IObservable<Unit> sensors)
    {
        Values = values;
        Sensors = sensors;
    }

    public IObservable<Unit> MergeSensors(IObservable<Unit> sensors)
    {
        return sensors == Sensors ? Sensors : Sensors.Merge(sensors);
    }

    public IObservable<T> MergeValues(IObservable<T> values)
    {
        return values == Values ? Values : Values.Merge(values);
    }
}

然后我们必须将所有 RX 方法转移到这个“Sensed”结构中:

public static class Sensed
{
    public static Sensed<T> Sensor<T>(this IObservable<T> source)
    {
        var hotSource = source.Publish().RefCount();
        return new Sensed<T>(hotSource, hotSource.Select(_ => Unit.Default));
    }

    public static Sensed<long> Interval(TimeSpan period)
    {
        return Observable.Interval(period).Sensor();
    }

    public static Sensed<TOut> Lift<TIn, TOut>(this Sensed<TIn> source, Func<IObservable<TIn>, IObservable<TOut>> lifter)
    {
        return new Sensed<TOut>(lifter(source.Values), source.Sensors);
    }

    public static Sensed<TOut> Select<TIn, TOut>(this Sensed<TIn> source, Func<TIn, TOut> func)
    {
        return source.Lift(values => values.Select(func));
    }

    public static Sensed<T> Where<T>(this Sensed<T> source, Func<T, bool> func)
    {
        return source.Lift(values => values.Where(func));
    }

    public static Sensed<T> Merge<T>(this Sensed<T> source1, Sensed<T> source2)
    {
        return new Sensed<T>(source1.MergeValues(source2.Values), source1.MergeSensors(source2.Sensors));
    }

    public static Sensed<TOut> CombineLatest<TIn1, TIn2, TOut>(this Sensed<TIn1> source1, Sensed<TIn2> source2, Func<TIn1, TIn2, TOut> func)
    {
        return new Sensed<TOut>(source1.Values.CombineLatest(source2.Values, func), source1.MergeSensors(source2.Sensors));
    }

    public static IDisposable Actuate<T>(this Sensed<T> source, Action<T> next) 
    {
        return source.Values.Sample(source.Sensors).Subscribe(next);
    }
}

我的例子就变成了:

var n = Sensed.Interval(TimeSpan.FromMilliseconds(100));
var a = n.Select(t => "a=" + t);
var b = n.Where(t => t % 10 == 0).Select(t => "b=" + t);
var ab = a.CombineLatest(b, Tuple.Create);
ab.Actuate(Console.WriteLine);

同样,只有“所需”值被传递给执行器,但通过这种设计,原始传感器会被记住在 Sensed 结构中。

我不确定这是否“有意义”(双关语),也许我应该放弃对纯 FRP 的渴望,并接受它。毕竟,时间是相对的;-)

彼得

于 2016-02-18T14:02:30.533 回答