1

我有以下类型的热门事件流:

Event
{
    string name;
    int state ; // its 1 or 2 ie active or unactive
}

有一个函数提供给定名称的父名称 - string GetParent(string name)

我需要为每个父级缓冲事件 2 分钟,如果在这 2 分钟内,我为给定父级的 state =2 的子级接收任何事件,则该缓冲区应取消并应输出 0 否则我将获得事件 recvd 的计数。

我知道我必须使用 GroupBy 进行分区,然后缓冲然后计数,但我无法想出一种方法来创建每个父级唯一的 Buffer,我虽然使用 Distinct 但这并不能解决问题,因为我只是不想创建缓冲区,直到父级处于活动状态(一旦父级的缓冲区被取消或 2 分钟结束,父级缓冲区可以再次创建)所以我知道我需要创建一个自定义缓冲区来检查创建缓冲区的条件,但是我如何通过响应式扩展来做到这一点。任何帮助将不胜感激。问候

感谢布兰登的帮助。这是我用于测试的主要程序。它不工作。因为我是反应式扩展的新手,所以我正在测试的方式可能是这样

namespace TestReactive
{
class Program
{
    static int abc = 1;
    static void Main(string[] args)
    {
        Subject<AEvent> memberAdded = new Subject<AEvent>();
        //ISubject<AEvent, AEvent> syncedSubject = new ISubject<AEvent, AEvent>();
        var timer = new Timer { Interval = 5 };

        timer.Enabled = true;
        timer.Elapsed += (sender, e) => MyElapsedMethod(sender, e, memberAdded);

        var bc = memberAdded.Subscribe();
        var cdc = memberAdded.GroupBy(e => e.parent)
  .SelectMany(parentGroup =>
  {
      var children = parentGroup.Publish().RefCount();
      var inactiveChild = children.SkipWhile(c => c.state != 2).Take(1).Select(c => 0);
      var timer1 = Observable.Timer(TimeSpan.FromSeconds(1));
      var activeCount = children.TakeUntil(timer1).Count();

      return Observable.Amb(activeCount, inactiveChild)
          .Select(count => new { ParentName = parentGroup.Key, Count = count });

  });

      Observable.ForEachAsync(cdc, x => WriteMe("Dum Dum " + x.ParentName+x.Count));
        // group.Dump("Dum");


        Console.ReadKey();
    }

    static void WriteMe(string sb)
    {

        Console.WriteLine(sb);
    }
    static void MyElapsedMethod(object sender, ElapsedEventArgs e, Subject<AEvent> s)
    {
        AEvent ab = HelperMethods.GetAlarm();
        Console.WriteLine(abc + " p =" + ab.parent + ", c = " + ab.name + " ,s = " + ab.state);
        s.OnNext(ab);

    }

}

}

    public static AEvent GetAlarm()
    {
        if (gp> 4)
            gp = 1;
        if (p > 4)
            p = 1;
        if (c > 4)
            c = 1;
        AEvent a = new AEvent();
        a.parent = "P" + gp + p;
        a.name = "C" + gp + p + c;
        if (containedKeys.ContainsKey(a.name))
        {
            a.state = containedKeys[a.name];
            if (a.state == 1)
                containedKeys[a.name] = 2;
            else
                containedKeys[a.name] = 1;

        }
        else
        {
            containedKeys.TryAdd(a.name, 1);

        }
        gp++; p++; c++;

        return a;

    }

因此,此方法在每次滴答时为 Parent 生成一个事件。它为状态 = 1 的父 P11、P22、P33、P44 生成事件,然后是状态 = 2 的父 P11、P22、P33、P44 的事件我正在使用 Observable.ForEach 打印结果,我看到它被调用4次,之后什么都没有,就像取消组一样没有发生

4

2 回答 2

3

假设每个组的两分钟缓冲区应该在看到该组的第一个事件后立即打开,并在两分钟后关闭或看到零状态,那么我认为以下工作:

public static IObservable<EventCount> EventCountByParent(
    this IObservable<Event> source, IScheduler scheduler)
{
    return Observable.Create<EventCount>(observer => source.GroupByUntil(
        evt => GetParent(evt.Name),
        evt => evt,
        group =>
        @group.Where(evt => evt.State == 2)
              .Merge(Observable.Timer(
                  TimeSpan.FromMinutes(2), scheduler).Select(_ => Event.Null)))
              .SelectMany(
                  go =>
                  go.Aggregate(0, (acc, evt) => (evt.State == 2 ? 0 : acc + 1))
              .Select(count => new EventCount(go.Key, count))).Subscribe(observer));
}

使用 EventCount(为测试实现相等覆盖)为:

public class EventCount
{
    private readonly string _name;
    private readonly int _count;

    public EventCount(string name, int count)
    {
        _name = name;
        _count = count;
    }

    public string Name { get { return _name; } }
    public int Count { get { return _count; } }

    public override string ToString()
    {
        return string.Format("Name: {0}, Count: {1}", _name, _count);
    }

    protected bool Equals(EventCount other)
    {
        return string.Equals(_name, other._name) && _count == other._count;
    }

    public override bool Equals(object obj)
    {
        if (ReferenceEquals(null, obj)) return false;
        if (ReferenceEquals(this, obj)) return true;
        if (obj.GetType() != this.GetType()) return false;
        return Equals((EventCount) obj);
    }

    public override int GetHashCode()
    {
        unchecked
        {
            return ((_name != null ? _name.GetHashCode() : 0)*397) ^ _count;
        }
    }
}

和事件为:

public class Event
{
    public static Event Null = new Event(string.Empty, 0);

    private readonly string _name;
    private readonly int _state;

    public Event(string name, int state)
    {
        _name = name;
        _state = state;
    }

    public string Name { get { return _name;  } }
    public int State { get { return _state; } }
}

我用 Rx-Testing 做了一个快速(即不是详尽的)测试:

public class EventCountByParentTests : ReactiveTest
{
    private readonly TestScheduler _testScheduler;

    public EventCountByParentTests()
    {
        _testScheduler = new TestScheduler();
    }

    [Fact]
    public void IsCorrect()
    {
        var source = _testScheduler.CreateHotObservable(
            OnNext(TimeSpan.FromSeconds(10).Ticks, new Event("A", 1)),
            OnNext(TimeSpan.FromSeconds(20).Ticks, new Event("B", 1)),
            OnNext(TimeSpan.FromSeconds(30).Ticks, new Event("A", 1)),
            OnNext(TimeSpan.FromSeconds(40).Ticks, new Event("B", 1)),
            OnNext(TimeSpan.FromSeconds(50).Ticks, new Event("A", 1)),
            OnNext(TimeSpan.FromSeconds(60).Ticks, new Event("B", 2)),
            OnNext(TimeSpan.FromSeconds(70).Ticks, new Event("A", 1)),
            OnNext(TimeSpan.FromSeconds(140).Ticks, new Event("A", 1)),
            OnNext(TimeSpan.FromSeconds(150).Ticks, new Event("A", 1)));

        var results = _testScheduler.CreateObserver<EventCount>();

        var sut = source.EventCountByParent(_testScheduler).Subscribe(results);

        _testScheduler.Start();

        results.Messages.AssertEqual(
            OnNext(TimeSpan.FromSeconds(60).Ticks, new EventCount("B", 0)),
            OnNext(TimeSpan.FromSeconds(130).Ticks, new EventCount("A", 4)),
            OnNext(TimeSpan.FromSeconds(260).Ticks, new EventCount("A", 2)));
    }
}
于 2013-05-09T21:14:42.347 回答
0

就像是....

source.GroupBy(e => GetParent(e.name))
      .SelectMany(parentGroup =>
      {
          var children = parentGroup.Publish().RefCount();
          var inactiveChild = children.SkipWhile(c => c.state != 2).Take(1).Select(c => 0);
          var timer = Observable.Timer(TimeSpan.FromMinutes(2));
          var activeCount = children.TakeUntil(timer).Count();

          return Observable.Amb(activeCount, inactiveChild)
              .Select(count => new { ParentName = parentGroup.Key, Count = count };
      });

这将为您提供一系列 { ParentName, Count } 对象。

于 2013-05-09T15:24:39.593 回答