1

在 Rx.NET 中,是否可以在按 key 分组的 hot observable 中对最新项目进行采样?

例如,如果我有一个IObservable<Price>,则在哪里Price

Price 
- Key
- Bid
- Offer

让我们假设IObservable链接到外部价格馈送。

我是否能够检索所有最新Price的 s,按 分组Key,使用 Rx 每 1 秒采样一次?

4

4 回答 4

3

假设一些 observable source,这将返回最后一秒进入的所有按 key 分组和采样的价格。

var sampled = source
    .GroupBy(p => p.Key)
    .SelectMany(o => o.Sample(TimeSpan.FromSeconds(1)));

如果有一些价格在最后一秒内没有收到消息,则不包括在内。

如果您想要包含旧价格,这将起作用:

var sampled2 = source
    .Scan(ImmutableDictionary<int, Price>.Empty, (state, p) => state.SetItem(p.Key, p))
    .Replay(1)
    .RefCount();
var dummySubscription = sampled2.Subscribe();
var result = Observable.Interval(TimeSpan.FromSeconds(1))
    .SelectMany(_ => sampled2.Take(1).SelectMany(state => state.Values));

只要确保在处理完observableDummySubscription时处理掉。result

于 2017-08-22T20:29:52.553 回答
1

这是做你想做的吗?

IObservable<ImmutableDictionary<string, Price>> sampled =
    Observable
        .Create<ImmutableDictionary<string, Price>>(o =>
        {
            var output = ImmutableDictionary<string, Price>.Empty;
            return
                source
                    .Do(x => output = output.SetItem(x.Key, x))
                    .Select(x => Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(y => output).StartWith(output))
                    .Switch()
                    .Subscribe(o);
        });
于 2017-08-23T11:28:46.203 回答
0

hot observable 不会在内存中保留任何旧值,但您可以自己捕获每个已知键的最后价格,例如在字典中。

请参考以下示例代码。

Dictionary<string, double> _prices = new Dictionary<string, double>();

GetPrices()
    .Buffer(TimeSpan.FromSeconds(1))
    .Subscribe(prices =>
    {
        if (prices != null && prices.Count > 0)
        {
            var grouped = prices.GroupBy(x => x.Key);
            foreach (var group in grouped)
                _prices[group.Key] = group.Last().Bid;
        }

        //print out the last quote of each known price key
        foreach (var price in _prices)
        {
            Console.WriteLine("Key: " + price.Key + ", last price: " + price.Value);
        }
    });

它应该每秒打印每个已知键的最后一个引号。

于 2017-08-23T08:47:08.137 回答
0

这是 Shlomo想法Scan的完善版本,使用运算符和ImmutableDictionaryas 状态维护每个键的最新值。下面的自定义运算符 ( SampleByKey) 以特定间隔对一系列关键元素进行采样。在每个采样滴答时IDictionary<TKey, TSource>发出一个,其中包含迄今为止每个键发出的最新值。

public static IObservable<IDictionary<TKey, TSource>> SampleByKey<TSource, TKey>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    TimeSpan interval,
    IEqualityComparer<TKey> keyComparer = default)
{
    return source
        .Scan(ImmutableDictionary.Create<TKey, TSource>(keyComparer),
            (dict, x) => dict.SetItem(keySelector(x), x))
        .Publish(published => Observable
            .Interval(interval)
            .WithLatestFrom(published, (_, dict) => dict)
            .TakeUntil(published.LastOrDefaultAsync()));
}

使用示例:

IObservable<IDictionary<string, Price>> sampled = priceFeed
    .SampleByKey(p => p.Key, TimeSpan.FromSeconds(1.0));

如果在两次采样期间发射了零个元素source,则将连续发射同一个字典。

这个实现与我之前在一个问题中发布的非常相似,关于如何以动态可变的时间间隔对序列进行采样。

注意:我删除了我之前的实现(修订版 1),因为它过于复杂且可能存在漏洞。Shlomo 的方法更易于理解和根据需要进行修改。

于 2020-12-22T01:44:35.440 回答