2

使用响应式扩展,很容易订阅同一个 observable 2 次。当 observable 中有一个新值可用时,两个订阅者都会使用相同的值调用。

有没有办法让每个订阅者从这个 observable 中获得不同的值(下一个)?

例如我所追求的:
源序列:[1,2,3,4,5,...](无限)
源不断以未知的速度添加新项目。
我正在尝试使用 N 个订阅者为每个项目执行一个冗长的异步操作。

第一个订阅者:1,2,4,...
第二个订阅者:3,5,...
...

第一个订阅者:1,3,...
第二个订阅者:2,4,5,...
.. .

第一个订阅者:1,3,5,...
第二个订阅者:2,4,6,...

4

2 回答 2

1

怎么样:

IObservable<TRet> SomeLengthyOperation(T input) 
{
    return Observable.Defer(() => Observable.Start(() => {
        return someCalculatedValueThatTookALongTime;
    }, Scheduler.TaskPoolScheduler));
}

someObservableSource
    .SelectMany(x => SomeLengthyOperation(input))
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

您甚至可以限制并发操作的数量:

someObservableSource
    .Select(x => SomeLengthyOperation(input))
    .Merge(4 /* at a time */)
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

Merge(4) 的工作很重要,SomeLengthyOperation 返回的 Observable 是一个Cold Observable,这就是 Defer 在这里所做的 - 它使 Observable.Start 在有人订阅之前不会发生。

于 2012-02-07T21:02:02.373 回答
1

我同意阿斯蒂的观点。

您可以使用 Rx 填充队列(阻塞集合),然后让竞争的消费者从队列中读取。这样,如果一个进程由于某种原因更快,如果它仍然很忙,它可能会在另一个消费者之前拿起下一个项目。

但是,如果您想这样做,反对好的建议:),那么您可以只使用 Select 运算符,它将为您提供每个元素的索引。然后,您可以将其传递给您的订阅者,他们可以适应模数。(糟糕!泄漏的抽象、幻数、潜在的阻塞、对源序列的潜在副作用等)

var source = Obserservable.Interval(1.Seconds())
  .Select((i,element)=>{new Index=i, Element=element});

var subscription1 = source.Where(x=>x.Index%2==0).Subscribe(x=>DoWithThing1(x.Element));
var subscription2 = source.Where(x=>x.Index%2==1).Subscribe(x=>DoWithThing2(x.Element));

还要记住,如果 OnNext 处理程序处于阻塞状态,它所做的工作仍然会阻塞它所在的调度程序。这可能会影响您的源/生产者的速度。Asti 的答案是更好选择的另一个原因。

询问是否不清楚:-)

于 2012-02-06T18:39:31.213 回答