5

我目前正在开发一个通过方程式组合许多数据流的应用程序。我想做的是:

var result = (stream1 + stream2) / stream3 + stream4 * 2;

每当result任何流更新时都会更新。目前,我可以在 Rx 中表达这一点的唯一方法是:

var result = stream1.CombineLatest(stream2, (x,y) => x+y)
  .CombineLatest(stream3, (x,y) => x / y)
  .CombineLatest(stream4, (x,y) => x + y*2);

这几乎不是那么清楚。

我目前的想法如下:

Public class ArithmeticStream : IObservable<double>
{
    public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
    {
        return Observable.CombineLatest(xx,yy, (x,y) => x + y);
    }
    ...
}

问题是 CombineLatest 返回一个IObservable<double>而不是 ArithmeticStream。

两个可能的问题:

如何透明地将 aIObservable<double>转换为 ArithmeticStream?

有没有其他路线可以让我得到我想要的结果?

4

4 回答 4

2

嗯......我认为你可以相对容易地做到 DSL 风格(不用摆弄运营商):

public static class Ext
{
    public static IObservable<double> Const(this double constant)
    {
        return Observable.Return(constant);
    }

    public static IObservable<double> Plus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l + r);
    }
    public static IObservable<double> Minus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l - r);
    }
    public static IObservable<double> Times(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l * r);
    }
    public static IObservable<double> Over(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right,(l,r) => l / r);
    }
}

所以你的查询是:

var result = (s1.Plus(s2)).Over(s3)
        .Plus(s4)
        .Times(2.0.Const());

或者,对于一个非常健谈的变体:

var verboseResult = 
    (s1.Do(Console.WriteLine).Plus(s2.Do(Console.WriteLine)))
    .Over(s3.Do(Console.WriteLine))
    .Plus(s4.Do(Console.WriteLine))
    .Times(2.0.Const())
    .Do(x => Console.WriteLine("(s1 + s2) / s3 + s4 * 2 = " + x));
于 2013-02-28T04:08:40.197 回答
2

将其添加为新答案,因为它完全不同...

因此,如果您致力于执行操作员重载路线,那么您需要这样做(嗯,至少是一种方式)......老实说,我不喜欢这种方法 -虽然它确实使查询编写更加简洁,但 DSL 方法具有类似的“简洁性”,但在不依赖运算符重载的意义上更清晰。

public static class ArithmeticStreamExt
{
    public static ArithmeticStream Wrap(this IObservable<double> src)
    {
        return new ArithmeticStream(src);
    }
    public static ArithmeticStream Const(this double constValue)
    {
        return new ArithmeticStream(Observable.Return(constValue));
    }
}
public class ArithmeticStream 
{
    private IObservable<double> _inner;

    public ArithmeticStream(IObservable<double> source)
    {
        _inner = source;
    }

    public IObservable<double> Source {get { return _inner; }}

    public static ArithmeticStream operator +(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l / r));
    }

    public static ArithmeticStream operator +(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
             left._inner.CombineLatest(right, (l, r) => l / r));
    }
}

还有一个测试台:

void Main()
{
    var s1 = new Subject<double>();
    var s2 = new Subject<double>();
    var s3 = new Subject<double>();
    var s4 = new Subject<double>();

    var result = (s1.Wrap() + s2) / s3 + (s4.Wrap() * 2.0.Const());
    using(result.Source.Subscribe(Console.WriteLine))
    {
        s1.OnNext(1.0);
        s2.OnNext(2.0);
        s3.OnNext(3.0);
        s4.OnNext(4.0);
    }
}
于 2013-02-28T17:16:41.920 回答
1

考虑创建一个接受 3、4、5 等 IObservables 和具有匹配数量的结果选择器函数的 CombineLatest 版本。这将使您将算术运算表达为对双精度的普通旧运算 - 非常简单和干净。

如果你需要帮助来实现这些,就这么说吧,我会举个例子。

编辑

我所指的 CombineLatest 重载已经存在,只是没有记录。

于 2013-02-28T04:18:58.630 回答
0

这不如运算符重载好,但我很确定你不能做你想要的那种运算符重载,因为扩展方法中不支持运算符重载。

var stream1 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.5, x => x);
var stream2 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.25, x => x);
var stream3 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.125, x => x);
var stream4 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.0625, x => x);
var result = stream1.CombineLatest(stream2, stream3, stream4, (w, x, y, z) => (w + x) / y + z * 2);

否则,JerKimball 给出了一个很好的答案。

于 2013-02-28T16:38:18.350 回答