我想要做的声明如下所示:
// Checks input source for timeouts, based on the number of elements received
// from clock since the last one received from source.
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
this IObservable<T1> source,
IObservable<T2> clock,
int countForTimeout,
Func<R> timedOutSelector,
Func<T1, R> okSelector)
大理石图在 ascii 中很困难,但这里有:
source --o---o--o-o----o-------------------o---
clock ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---
我已经尝试寻找Observable
可以组合source
并clock
以我可以使用的方式的现有函数,但是大多数组合函数依赖于接收“每个”(And
,Zip
),或者它们从'缺少'一个 ( CombineLatest
),或者它们离我需要的太远了 ( Amb
, GroupJoin
, Join
, Merge
, SelectMany
, Timeout
)。Sample
看起来很接近,但我不想将源吞吐量限制为时钟速率。
所以现在我被困在试图填补这里的巨大空白:
return new AnonymousObservable<R>(observer =>
{
//One observer, two observables??
});
抱歉,“您尝试过什么”部分在这里有点弱:假设我已经尝试过思考它!我不是要求完整的实施,只是:
- 是否有一个内置功能可以帮助我,我错过了?
- 如何构建一个订阅两个可观察对象的基于 lambda 的观察者?