您也标记了问题 rx.net,所以我将假设在 C# 中给出答案是奢侈的。我不确定这对 Java 的翻译效果如何,如果那是您正在寻找的。
Rx并不是真的为此:它们是基于时间窗口加入的Join
。GroupJoin
您希望通过 ID 加入。
一个 Rx 友好的解决方案将是有效的。因为你需要一些状态,所以我们可以使用一个不可变的状态烘焙到一个Scan
函数中。在 C# 中,有ImmutableDictionary<TKey, TItem>
来自 Nuget package System.Collections.Immutable
。我不确定Java中有没有等价物。
鉴于这些类:
public class CustomEvent
{
public int Id { get; set; }
}
public class Result
{
public ResultType Type { get; set; }
public int Id { get; set; }
}
public enum ResultType
{
Success,
Unknown,
Expired
}
你可以得到这样的解决方案:
IObservable<CustomEvent> o1;
IObservable<int> o2;
TimeSpan expirationTimeDelay = TimeSpan.FromHours(1);
IObservable<Result> results = Observable.Merge(
o1.SelectMany(ce => Observable.Merge(
Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
Tuple.Create(h.Add(ce.Id, ce), default(Result), false)
)),
Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
h.ContainsKey(ce.Id)
? Tuple.Create(h.Remove(ce.Id), new Result { Type = ResultType.Expired, Id = ce.Id}, true)
: Tuple.Create(h, default(Result), false)
))
.Delay(expirationTimeDelay)
)),
o2.Select(id => new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
h.ContainsKey(id)
? Tuple.Create(h.Remove(id), new Result { Type = ResultType.Success, Id = id }, true)
: Tuple.Create(h, new Result { Type = ResultType.Unknown, Id = id }, true)
))
)
.Scan(Tuple.Create(ImmutableDictionary<int, CustomEvent>.Empty, default(Result), false), (t, f) => f(t.Item1))
.Where(t => t.Item3)
.Select(t => t.Item2);
不可变字典是我们的核心状态,并保存来自o1
. accumulator 函数返回一个具有三个属性的元组:表示我们的核心状态的不可变字典、一个结果对象和一个布尔值。Boolean 对象是一个过滤器,显示是否应该传播结果对象。
一个有趣的技巧Scan
是反转正常用法:将项目流转换为在状态下工作的函数。在我们的例子中,函数的类型是 Func、Tuple、Results、Boolean>>(一个接收字典并返回包含三个值的元组的函数)。
这就是我们在这里所做的:每个o1
项目弹出两个函数:一个将项目添加到不可变字典中(并且不推送结果)。一个小时后会出现另一个功能,以查看该事件是否尚未加入。如果加入,则不会发生任何事情。如果未加入,则会弹出 Expired 结果。每个o2
项目都会弹出一个功能:检查项目是否在地图中。如果存在,将弹出 Normal 结果。如果不存在,则为未知。
如果您在 Java 中,并且没有容易获得的等价于ImmutableDictionary
,那么您可能可以替换为常规HashMap
,但您必须通过调用来防止来自多个订阅者的讨厌的状态问题Publish
。