0

我有一个基于轮询的协议,我想利用 RX 将其转换为基于推送的协议。每隔 x 秒,我使用协议请求标签(名称和值),然后我从中取回它们。

我只对更改标签中的值感兴趣,所以我使用 DistinctUntilChanges 函数。

this.TagsChangeNotifier = _tags
    .Select(tag => 
    { 
        return Observable
            .Interval(ts)
            .Select(_ => { return tag; })
            .DistinctUntilChanged(new DataTagComparer()); 
    })
    .Merge();

这是 DataTagcomparer 类。

public class DataTagComparer : IEqualityComparer<DataTag>
{

    public bool Equals(DataTag x, DataTag y)
    {
            b = y.WeakRawValue.ToByteArray().SequenceEqual(x.WeakRawValue.ToByteArray());

        return b;
    }

    public int GetHashCode(DataTag obj)
    {
        return obj.Name.GetHashCode();
    }
}

但看起来无法工作,因为我永远看不到两个不同值之间的比较。这是一个例子。

Start program: DataTag("Test",1)
Equals called: x = ("Test",1), y = ("Test",1)

等待 10 秒,然后从协议更改为返回 2 而不是 1。

Equals called: x = ("Test",1), y = ("Test",1)
Equals called: x = ("Test",2), y = ("Test",2)
Equals called: x = ("Test",2), y = ("Test",2)
Equals called: x = ("Test",2), y = ("Test",2)

等等。

奇怪的是,它完全没有以前值和当前值之间的比较!你知道可能是什么问题吗?实际上我正在使用这个可怕的解决方法

public class DataTagComparer : IEqualityComparer<DataTag>
{

    private object val;

    public bool Equals(DataTag x, DataTag y)
    {
        bool b = true;

        if (val != null)
            b = val.ToByteArray().SequenceEqual(x.WeakRawValue.ToByteArray());

        val = x.WeakRawValue;

        return b;
    }

    public int GetHashCode(DataTag obj)
    {
        return obj.Name.GetHashCode();
    }
}

谢谢你的关注,文森佐。

编辑:DataTag 类代码

public abstract class DataTag
{
    public DataTag(string _Name, string Desc)
    {
        Name = _Name;
        Description = Desc;
    }
    public string Name { get; private set; }
    public string Description { get; private set; }
    public abstract object WeakValue { get; }
    public abstract object WeakRawValue { get; }
}

编辑:标签更新功能

this.timerHandle = Observable.Interval(ts).Select(_ => { Update(); return _; }).Publish().Connect();
4

1 回答 1

1

根据您的描述,这....看起来不正确-尽管我可能会误解您...

this.TagsChangeNotifier = _tags
    // for each tag value in tags...
    .Select(tag => 
    { 
        // Tick off every TimeSpan ts, then...
        return Observable.Interval(ts)
            // Say we've "ticked"
            .Do(tick => Console.WriteLine("It's time to tick!"))
            // Return the value "tag" (which remains constant...)
            .Select(_ => { return tag; })
            // Say what we see
            .Do(t => Console.WriteLine("I see a {0}!", t))
            // But only when it's different from the last one
            // (but we never change the value?)
            .DistinctUntilChanged(new DataTagComparer()); 
    })    
    // And mash them all together into one stream
    .Merge();

我想这一切都取决于是什么_tags,在某种程度上取决于定义DataTag是什么,但我认为这不是你真正想要的。

编辑:

让我们画出流程 - 从 开始_tags,我暂时假设是IObservable

Time   _tags
  |    tag1
  |    tag2
  |    tag3

到目前为止,一切都很好 - 现在对于其中的每一个,我们Select创建一个Interval

Time   _tags
  |    tag1
  |     \---- Interval
  |
  |    tag2
  |     \---- Interval
  |
  |    tag3
  |     \---- Interval
  |

我们打勾了一会儿,每次都重新选择标签:

Time   _tags
  |    tag1
  |     \---- Interval
  |            \-------Tick -> tag1
  |            \-------Tick -> tag1
  |            \-------Tick -> tag1
  |    tag2
  |     \---- Interval
  |            \-------Tick -> tag2
  |            \-------Tick -> tag2
  |            \-------Tick -> tag2
  |    tag3
  |     \---- Interval
  |            \-------Tick -> tag3
  |            \-------Tick -> tag3
  |            \-------Tick -> tag3
  |

然后我们添加DistinctUntilChanged

Time   _tags
  |    tag1
  |     \---- Interval
  |            \-------Tick -> tag1 ---> tag1
  |            \-------Tick -> tag1 -X
  |            \-------Tick -> tag1 -X
  |    tag2
  |     \---- Interval
  |            \-------Tick -> tag2 ---> tag2
  |            \-------Tick -> tag2 -X
  |            \-------Tick -> tag2 -X
  |    tag3
  |     \---- Interval
  |            \-------Tick -> tag3 ---> tag3
  |            \-------Tick -> tag3 -X
  |            \-------Tick -> tag3 -X
  |

最后Merge是子流:

Time   _tags                                      Output
  |    tag1                                         |
  |     \---- Interval                              |
  |            \-------Tick -> tag1 ---> tag1       tag1
  |            \-------Tick -> tag1 -X              |
  |            \-------Tick -> tag1 -X              |
  |    tag2                                         |
  |     \---- Interval                              |
  |            \-------Tick -> tag2 ---> tag2       tag2
  |            \-------Tick -> tag2 -X              |
  |            \-------Tick -> tag2 -X              |
  |    tag3                                         |
  |     \---- Interval                              |
  |            \-------Tick -> tag3 ---> tag3       tag3
  |            \-------Tick -> tag3 -X              |
  |            \-------Tick -> tag3 -X              |
  |

因此,如果您需要做的就是捕捉值流发生变化时,您可以尝试如下形状:

// my fake source of "tags", in this case simple strings
var subject = new Subject<string>();
var source = subject.Publish().RefCount();

// Still want to track "distinct chains"
var distincts = source.DistinctUntilChanged();
// But we also want to "look into the future", and see the *next* distinct chain
var futureDistincts = source.DistinctUntilChanged().Skip(1);
// A "delta" occurs when a distinct chain ends, so we'll zip the two
// sequences together (since they are "now distinct" and "now + 1", this will mean changes)
var onlyDeltas = distincts
    .Zip(futureDistincts, (before,after) => Tuple.Create(before,after));

using(onlyDeltas.Subscribe(Console.WriteLine))
{
    subject.OnNext("Foo");
    subject.OnNext("Foo");
    subject.OnNext("Foo");
    subject.OnNext("Bar");  // BAM: triggers an output value of (Foo, Bar)
    subject.OnNext("Bar");
    subject.OnNext("Foo"); // BAM: triggers an output value of (Bar, Foo)
}
于 2013-04-02T23:02:15.393 回答