4

我有多个(2-10)个热可观察对象,每个在给定的时间窗口内触发一个事件,但不是按顺序。

假设我有 3 个主题,为了说明这一点,它们按任意顺序触发:

Subjects: sA, sB, sC

Order of time window 1:
  sB, then sA, then sC
Order of time window 2:
  sA, then sC, then sB
Order of time window 3:
  sA, then sB, then sC
etc.

因此,如果我要进行合并,我会得到以下流:sB、sA、sC、sA、sC、sB、sA、sB、sC 等。

现在,这是我的问题:我想强制每个时间窗口内的事件顺序。

1. sA
2. sB
3. sC

这将导致以下合并流:sA、sB、sC、sA、sB、sC、sA、sB、sC 等。

窗口时间本身是未知的,但是由于每个事件在每个窗口中只触发一次,我们可以假设一旦所有对象都触发了窗口,窗口就会关闭并打开一个新窗口。

任何想法如何优雅地做到这一点?

扩展问题(不那么重要):可观察对象彼此独立,因此我需要引入一个通用的静态方法,我可以在其中同步每个可观察对象的顺序。

public static IObservable<T> SynchronizeOrder<T>(IObservable<T> source, int order)
{
  //return synchronized source
}

更新:我最初的问题中不清楚的是,可观察对象可以是不同的事件类型(这就是我在示例中没有使用特定事件数据类型的原因),因此我不想合并有序的可观察对象. 我想有一种机制,我可以确保独立的可观察对象按它们的顺序同步。

myEventStream
 .ObserveOn(TaskFactory)
 .DoSomeExpensiveComputation()
 .Order(2) //doesn't have to be an extension method
 .Subscibe(computationResult=>
            sharedRessourceWhereOrderMatters.Update(computationResult))
4

2 回答 2

3

这可以使用可观察连接完成您想要的操作:

    using System.Reactive;
    using System.Reactive.Subjects;
    using System.Reactive.Linq;

    var a = new Subject<int>();
    var b = new Subject<int>();
    var c = new Subject<int>();

    var test = 
        Observable.When(
            a.And(b).And(c).Then((a1, b1, c1) => new int[] { a1, b1, c1 })
        ).SelectMany(arr => arr.ToObservable());

    var sub = test.Subscribe(val => Console.Write("{0} ", val));

    a.OnNext(1);
    b.OnNext(2);
    c.OnNext(3);

    a.OnNext(1);
    c.OnNext(3);
    b.OnNext(2);

    c.OnNext(3);
    a.OnNext(1);
    b.OnNext(2);

    Console.ReadKey();
    sub.Dispose();

输出是:

1 2 3 1 2 3 1 2 3

但请注意:

  1. 这不提供允许您以某种顺序注入特定可观察对象的扩展方法。您需要使用And(可以逐步完成)构建计划,然后最终执行它。“建立”顺序将是输出顺序。

  2. 这不看实际的窗口创建。但是,没有理由不能将其注入到可观察链中的某个地方。

  3. 每次所有三个可观察对象都具有值时,此解决方案将触发一次。您可能需要设置它并在正确的位置将其拆除以获得所需的行为。

于 2012-04-20T03:18:37.817 回答
1

yamens 的答案非常适合在同一范围内具有相同类型的 observables。在我的情况下(扩展问题),可观察对象可以是不同的类型,并且可以在执行的任何时候同步订单。

所以这是我提出的简单解决方案:

private readonly object syncRoot = new object();
private int sourceCount;
private readonly SortedDictionary<int, Action> buffer = new SortedDictionary<int, Action>();

public IObservable<T> Order<T>(IObservable<T> source, int order)
{
    return Observable.Create<T>(observer =>
    {
        lock (syncRoot)
            sourceCount++;

        source.Synchronize(syncRoot).Subscribe(item =>
        {
            buffer[order] = () => observer.OnNext(item);
            if(buffer.Count == sourceCount)
            {
                foreach (var action in buffer.Values)
                    action();
                buffer.Clear();
            }
        }, observer.OnError, observer.OnCompleted);

        return () =>
        {
            lock (syncRoot)
                sourceCount--;
        };
    });
}//end order
于 2012-04-20T19:07:43.930 回答