2

给定 2 个热可观察对象 t1 和 t2,我将如何 GoupJoin 以便我从 t2 获取所有事件,这些事件发生在 t1 中每个事件之前 x 秒和 y 秒之后?

鉴于:

t1 -----A-----B-----C

t2 --1--2--3--4--5--6

如果 t1 相隔 2 秒,t2 相隔 1 秒,我们正在寻找每个 t1 事件两侧相隔 1 秒的 t2 事件,结果如下。

结果:

{ A, [1,2,3] }

{ B, [3,4,5] }

{ C, [5,6] }

下面是一个真实的例子,我们需要解决上述问题。我们有一个电子邮件流和另一个文本消息流。我们需要发出另一个流的结果,其中包含电子邮件,并且文本消息发生在电子邮件发送时间之前或之后的 1 分钟内。

4

2 回答 2

1

这里的问题(正如 Shlomo 提到的)是我们需要在事件发生t2之前打开窗口。t1不幸的是,这是不可能的,因为一旦我们到达事件,t1我们已经过了需要在 中打开窗口的点t2

相反,我们可以做的是t2使用 Delay() 将时间向前移动。如果我们将其抵消x(之前的时间),我们可以将问题重新定义为“获取在打开和关闭t2的窗口中发生的事件。我们可以使用 GroupJoin 来解决这个问题。t1t1 + x + y

var scheduler = new HistoricalScheduler();

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
    .Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);

var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time

var delayedT2 = t2.Delay(x, scheduler);

var g = t1.GroupJoin(delayedT2 ,
    _ => Observable.Timer(x + y, scheduler),
    _ => Observable.Empty<Unit>(scheduler),
    (a, b) => new { a, b}
);

scheduler.Start();

这给出了结果:

{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }

这个结果仍然不是你所期望的。这是因为在您的示例t2中,事件发生在完全相同的即时t1事件中。在这种情况下,t1 + y首先处理事件并在包含事件之前关闭窗口t2。这意味着我们正在有效地获得(t1-01:00) <= t1 < (t1 + 01:00). 例如,窗口A是 01:0000 - 02.9999... 这就是为什么3不包括在 03:00 发生的原因。

y只需在我们的时间中添加一个刻度,就可以将其修复为具有包容性

var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1)); 
于 2018-01-11T11:54:30.933 回答
1

代码转储答案(使用 100 毫秒代替 1 秒):

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
    .Select(l => (char)('A' + l))
    .Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Delay(TimeSpan.FromMilliseconds(100));

var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time

var g = t1.Timestamp().Join(t2.Timestamp(),
    c => Observable.Timer(y),
    i => Observable.Timer(x + y),
    (c, i) => new {GroupItem = c, RightItem = i}
)
    .Where(a =>
        (a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
        || (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
    )
    .Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
    .GroupBy(a => a.GroupItem, a => a.RightItem);

解释: Join都是关于“windows”的。所以当你定义一个连接时,你必须考虑从左可观察和右可观察的每个项目打开的时间窗口。但是我们这里的窗口很难弄清楚:我们必须以某种方式在它发生之前打开一个窗口,让它在左边可观察到 X 时间,然后在它发生后 Y 时间关闭它。

而不是做不可能的事情,所以我们只在左侧项目出现后将其打开 Y 时间,并让右侧项目窗口由 X + Y 时间定义。但是,这会给我们留下不应该包含的项目。所以我们Where在时间戳上使用 a 来过滤掉那些。

最后,我们选择匿名类型和时间戳并将它们组合在一起。

我不认为这GroupJoin是要走的路:你最终会拆散小组并像我所做的那样重组它。

于 2018-01-03T15:38:15.143 回答