在 Rx.NET 中,是否可以在按 key 分组的 hot observable 中对最新项目进行采样?
例如,如果我有一个IObservable<Price>
,则在哪里Price
:
Price
- Key
- Bid
- Offer
让我们假设IObservable
链接到外部价格馈送。
我是否能够检索所有最新Price
的 s,按 分组Key
,使用 Rx 每 1 秒采样一次?
在 Rx.NET 中,是否可以在按 key 分组的 hot observable 中对最新项目进行采样?
例如,如果我有一个IObservable<Price>
,则在哪里Price
:
Price
- Key
- Bid
- Offer
让我们假设IObservable
链接到外部价格馈送。
我是否能够检索所有最新Price
的 s,按 分组Key
,使用 Rx 每 1 秒采样一次?
假设一些 observable source
,这将返回最后一秒进入的所有按 key 分组和采样的价格。
var sampled = source
.GroupBy(p => p.Key)
.SelectMany(o => o.Sample(TimeSpan.FromSeconds(1)));
如果有一些价格在最后一秒内没有收到消息,则不包括在内。
如果您想要包含旧价格,这将起作用:
var sampled2 = source
.Scan(ImmutableDictionary<int, Price>.Empty, (state, p) => state.SetItem(p.Key, p))
.Replay(1)
.RefCount();
var dummySubscription = sampled2.Subscribe();
var result = Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => sampled2.Take(1).SelectMany(state => state.Values));
只要确保在处理完observableDummySubscription
时处理掉。result
这是做你想做的吗?
IObservable<ImmutableDictionary<string, Price>> sampled =
Observable
.Create<ImmutableDictionary<string, Price>>(o =>
{
var output = ImmutableDictionary<string, Price>.Empty;
return
source
.Do(x => output = output.SetItem(x.Key, x))
.Select(x => Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(y => output).StartWith(output))
.Switch()
.Subscribe(o);
});
hot observable 不会在内存中保留任何旧值,但您可以自己捕获每个已知键的最后价格,例如在字典中。
请参考以下示例代码。
Dictionary<string, double> _prices = new Dictionary<string, double>();
GetPrices()
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(prices =>
{
if (prices != null && prices.Count > 0)
{
var grouped = prices.GroupBy(x => x.Key);
foreach (var group in grouped)
_prices[group.Key] = group.Last().Bid;
}
//print out the last quote of each known price key
foreach (var price in _prices)
{
Console.WriteLine("Key: " + price.Key + ", last price: " + price.Value);
}
});
它应该每秒打印每个已知键的最后一个引号。
这是 Shlomo想法Scan
的完善版本,使用运算符和ImmutableDictionary
as 状态维护每个键的最新值。下面的自定义运算符 ( SampleByKey
) 以特定间隔对一系列关键元素进行采样。在每个采样滴答时IDictionary<TKey, TSource>
发出一个,其中包含迄今为止每个键发出的最新值。
public static IObservable<IDictionary<TKey, TSource>> SampleByKey<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TimeSpan interval,
IEqualityComparer<TKey> keyComparer = default)
{
return source
.Scan(ImmutableDictionary.Create<TKey, TSource>(keyComparer),
(dict, x) => dict.SetItem(keySelector(x), x))
.Publish(published => Observable
.Interval(interval)
.WithLatestFrom(published, (_, dict) => dict)
.TakeUntil(published.LastOrDefaultAsync()));
}
使用示例:
IObservable<IDictionary<string, Price>> sampled = priceFeed
.SampleByKey(p => p.Key, TimeSpan.FromSeconds(1.0));
如果在两次采样期间发射了零个元素source
,则将连续发射同一个字典。
这个实现与我之前在一个问题中发布的非常相似,关于如何以动态可变的时间间隔对序列进行采样。
注意:我删除了我之前的实现(修订版 1),因为它过于复杂且可能存在漏洞。Shlomo 的方法更易于理解和根据需要进行修改。