我有以下类型的热门事件流:
Event
{
string name;
int state ; // its 1 or 2 ie active or unactive
}
有一个函数提供给定名称的父名称 - string GetParent(string name)
我需要为每个父级缓冲事件 2 分钟,如果在这 2 分钟内,我为给定父级的 state =2 的子级接收任何事件,则该缓冲区应取消并应输出 0 否则我将获得事件 recvd 的计数。
我知道我必须使用 GroupBy 进行分区,然后缓冲然后计数,但我无法想出一种方法来创建每个父级唯一的 Buffer,我虽然使用 Distinct 但这并不能解决问题,因为我只是不想创建缓冲区,直到父级处于活动状态(一旦父级的缓冲区被取消或 2 分钟结束,父级缓冲区可以再次创建)所以我知道我需要创建一个自定义缓冲区来检查创建缓冲区的条件,但是我如何通过响应式扩展来做到这一点。任何帮助将不胜感激。问候
感谢布兰登的帮助。这是我用于测试的主要程序。它不工作。因为我是反应式扩展的新手,所以我正在测试的方式可能是这样
namespace TestReactive
{
class Program
{
static int abc = 1;
static void Main(string[] args)
{
Subject<AEvent> memberAdded = new Subject<AEvent>();
//ISubject<AEvent, AEvent> syncedSubject = new ISubject<AEvent, AEvent>();
var timer = new Timer { Interval = 5 };
timer.Enabled = true;
timer.Elapsed += (sender, e) => MyElapsedMethod(sender, e, memberAdded);
var bc = memberAdded.Subscribe();
var cdc = memberAdded.GroupBy(e => e.parent)
.SelectMany(parentGroup =>
{
var children = parentGroup.Publish().RefCount();
var inactiveChild = children.SkipWhile(c => c.state != 2).Take(1).Select(c => 0);
var timer1 = Observable.Timer(TimeSpan.FromSeconds(1));
var activeCount = children.TakeUntil(timer1).Count();
return Observable.Amb(activeCount, inactiveChild)
.Select(count => new { ParentName = parentGroup.Key, Count = count });
});
Observable.ForEachAsync(cdc, x => WriteMe("Dum Dum " + x.ParentName+x.Count));
// group.Dump("Dum");
Console.ReadKey();
}
static void WriteMe(string sb)
{
Console.WriteLine(sb);
}
static void MyElapsedMethod(object sender, ElapsedEventArgs e, Subject<AEvent> s)
{
AEvent ab = HelperMethods.GetAlarm();
Console.WriteLine(abc + " p =" + ab.parent + ", c = " + ab.name + " ,s = " + ab.state);
s.OnNext(ab);
}
}
}
public static AEvent GetAlarm()
{
if (gp> 4)
gp = 1;
if (p > 4)
p = 1;
if (c > 4)
c = 1;
AEvent a = new AEvent();
a.parent = "P" + gp + p;
a.name = "C" + gp + p + c;
if (containedKeys.ContainsKey(a.name))
{
a.state = containedKeys[a.name];
if (a.state == 1)
containedKeys[a.name] = 2;
else
containedKeys[a.name] = 1;
}
else
{
containedKeys.TryAdd(a.name, 1);
}
gp++; p++; c++;
return a;
}
因此,此方法在每次滴答时为 Parent 生成一个事件。它为状态 = 1 的父 P11、P22、P33、P44 生成事件,然后是状态 = 2 的父 P11、P22、P33、P44 的事件我正在使用 Observable.ForEach 打印结果,我看到它被调用4次,之后什么都没有,就像取消组一样没有发生