1

假设有两个 observables o1, o2。第一个从内部进程接收事件(在很长的计算完成后),第二个通过 REST 端点接收外部事件(表示另一个外部组件也完成了)。事件数据只是一个 ID。

现在我想设计一个工作流,以便只有当两个可观察对象中都存在一个 ID 时,才会发出一个新事件(即当内部和外部计算完成时)。

让某个时间点o1包含 IDs {1,2,3},然后我想区分这些情况:

  1. 正常情况:例如 ID2到达o2。两个 ID 现在都存在于两个 observables 中,输出“Success: 2”
  2. 过期案例:内部计算完成后的一段时间,外部事件尚未到达。例如 ID2存在o1o2即使在一小时后也不存在,输出:“Expired: 2”
  3. 未知情况:ID(例如 4)通过 REST 端点到达,o2但该端点不存在o1,可能是因为 ID 已过期或仅仅是因为外部组件有故障,输出:“未知:3”

我找到了groupJoin可能做我想做的事情的操作符,这里甚至是一个属性匹配的例子:GroupJoin - Joins two streams matching by a attributes

但是,每次新事件到达时,此示例似乎都会对所有元素执行耗尽(线性时间)扫描。我认为可以推出我自己的版本来在恒定时间内检查地图,但是:我想知道是否有一种规范的方式甚至是开箱即用的功能(因为我猜这是一个很常见的用例)。

(而且我是 Rx 的新手,为这种加入操作实现过期案例的最佳方法是什么)

4

3 回答 3

2

我会通过在外部对象中具有中间状态来做到这一点:

public class ItemJoinCache<T> {
   private Map<Integer, T> items;
   public Observable<T> ingestInternal(T item) {
      // an internal item arrived, do the necessary work
   }
   public Observable<T> ingestExternal(T item) {
      // an external item arrived, do the necessary work
   }
}

externalRestCallThatReturnsObservable()
.flatMap(myItemJoinCache::ingestExternal)
...

internalProcessThatTakesALongTime()
.flatMap(myItemJoinCache::ingestInternal)
...

通过这种方式,您可以进行您可能需要的任何类型的处理。

于 2017-05-26T10:51:36.847 回答
1

您也标记了问题 rx.net,所以我将假设在 C# 中给出答案是奢侈的。我不确定这对 Java 的翻译效果如何,如果那是您正在寻找的。

Rx并不是真的为此:它们是基于时间窗口加入的JoinGroupJoin您希望通过 ID 加入。

一个 Rx 友好的解决方案将是有效的。因为你需要一些状态,所以我们可以使用一个不可变的状态烘焙到一个Scan函数中。在 C# 中,有ImmutableDictionary<TKey, TItem>来自 Nuget package System.Collections.Immutable。我不确定Java中有没有等价物。

鉴于这些类:

public class CustomEvent
{
    public int Id { get; set; }
}

public class Result
{
    public ResultType Type { get; set; }
    public int Id { get; set; }
}

public enum ResultType
{
    Success,
    Unknown,
    Expired
}

你可以得到这样的解决方案:

IObservable<CustomEvent> o1;
IObservable<int> o2;
TimeSpan expirationTimeDelay = TimeSpan.FromHours(1);

IObservable<Result> results = Observable.Merge(
    o1.SelectMany(ce => Observable.Merge(
        Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h => 
            Tuple.Create(h.Add(ce.Id, ce), default(Result), false)
        )),
        Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
            h.ContainsKey(ce.Id)
                ? Tuple.Create(h.Remove(ce.Id), new Result { Type = ResultType.Expired, Id = ce.Id}, true)
                : Tuple.Create(h, default(Result), false)
        ))
            .Delay(expirationTimeDelay)
    )),
    o2.Select(id => new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
        h.ContainsKey(id)
            ? Tuple.Create(h.Remove(id), new Result { Type = ResultType.Success, Id = id }, true)
            : Tuple.Create(h, new Result { Type = ResultType.Unknown, Id = id }, true)
    ))
)
.Scan(Tuple.Create(ImmutableDictionary<int, CustomEvent>.Empty, default(Result), false), (t, f) => f(t.Item1))
.Where(t => t.Item3)
.Select(t => t.Item2);

不可变字典是我们的核心状态,并保存来自o1. accumulator 函数返回一个具有三个属性的元组:表示我们的核心状态的不可变字典、一个结果对象和一个布尔值。Boolean 对象是一个过滤器,显示是否应该传播结果对象。

一个有趣的技巧Scan是反转正常用法:将项目流转换为在状态下工作的函数。在我们的例子中,函数的类型是 Func、Tuple、Results、Boolean>>(一个接收字典并返回包含三个值的元组的函数)。

这就是我们在这里所做的:每个o1项目弹出两个函数:一个将项目添加到不可变字典中(并且不推送结果)。一个小时后会出现另一个功能,以查看该事件是否尚未加入。如果加入,则不会发生任何事情。如果未加入,则会弹出 Expired 结果。每个o2项目都会弹出一个功能:检查项目是否在地图中。如果存在,将弹出 Normal 结果。如果不存在,则为未知。

如果您在 Java 中,并且没有容易获得的等价于ImmutableDictionary,那么您可能可以替换为常规HashMap,但您必须通过调用来防止来自多个订阅者的讨厌的状态问题Publish

于 2017-05-26T14:01:17.343 回答
0

您始终可以将 o1 减少为带有 的 Set scan。当 o2 发出一个值时,您从 o1 中获取最新的集合withLatestFrom并检查包含。Atimeout可以解决过期部分。RxJs 5 中的示例:

o2
.withLatestFrom(
  o1.scan((set, val) => set.add(val), new Set),
  (o2Val, o1Set) => o1Set.has(o2Val) ? "Success" : "Unknown"
)
.timeoutWith(3600000, Observable.of("Expire"))
.subscribe(console.log)
于 2017-05-26T14:31:46.050 回答