0

我有一个 observableGroupBy用于获取多个流。我实际上想要Scan每个子流的结果。假设 observable 超过产品价格,扫描结果是每种产品类型的平均价格。

我有另一个与这些“产品”相关的事件流(比如说“显示产品价格”事件),我想将它与上一个流的最新产品价格结合起来。因此,Scan每组的输出需要与事件流的每个元素相结合,以获得该事件产品的最新平均价格。

出于某种原因,我无法获得正确的语法,并且我整天都在抨击这一点。有人可以帮忙吗?


更新

我添加下面的代码来说明大概的意图。

 public class Node
{
    private List<int> Details = new List<int>();

    public void AddInfo(int x)
    {
        Details.Add(x );
    }

    public Node(int x)
    {
        Details.Add(x);  
    }

    public int Index => Details[0]%10; //just to simplify the grouping and debugging

    public int Latest => Details.Last();
}

public class Message
{
    private static Random _random = new Random();

    public int MessageNodeInfo { get; private set; }

    public Message()
    {
        MessageNodeInfo = _random.Next(); 
    }
}


public class AccumulatingInfoTest
{


    private static Random _random=new Random();

    private IObservable<Message> MessageStream()
    {
        TimeSpan timeSpan = TimeSpan.FromSeconds(0.5);


        var ret= Observable.Generate(0,
            _ => { return true; }, 
            _ => { return 0; }, 
            _ => { return new Message(); },
            _=> timeSpan)
            .Publish()
            .RefCount();



        return ret;

    }


    public class ArbitraryCommonClass
    {
        public int K { get; set; }
        public Message M { get; set; }
        public Node D { get; set; }

        public ArbitraryCommonClass Combine(ArbitraryCommonClass a)
        {
            return new ArbitraryCommonClass()
            {
                K = this.K,
                M = this.M ?? a.M,
                D = this.D ?? a.D
            };
        }
    }

    public void Start()
    {

        var inputStream = MessageStream();

        inputStream.Subscribe(y => Console.WriteLine("Input: K " + y.MessageNodeInfo % 10 + " V " + y.MessageNodeInfo));


        var nodeInfoStream = inputStream
            .Select(nodeInfo => new Node(nodeInfo.MessageNodeInfo))
            .GroupBy(node => node.Index)
            .Select(groupedObservable => new
                        {
                            Key = groupedObservable.Key,
                            Observable = groupedObservable
                                .Scan(

                                    (nodeAcc, node) => { nodeAcc.AddInfo(node.Latest); return nodeAcc; }

                                    )
                                .Select(a => new ArbitraryCommonClass() { K = a.Index, M = (Message)null, D = a })

                        }
                    );

        var groupedMessageStream =
            inputStream
            .GroupBy(
                    m => new Node(m.MessageNodeInfo).Index
                    )
            .Select(a => new
                        {
                            Key =a.Key,
                            Observable = a.Select(b => new ArbitraryCommonClass() { K = a.Key, M = b, D = null })

                        });



        var combinedStreams = nodeInfoStream
            .Merge(groupedMessageStream)
            .GroupBy(s => s.Key)
            .Select(grp => grp
                .Scan(

                    (state, next) => new { Key = state.Key, Observable = Observable.CombineLatest(state.Observable, next.Observable, (x, y) => { return x.Combine(y); }) }
                )



            )
            .Merge()
            .SelectMany(x => x.Observable.Select(a=>a));

        combinedStreams.Where(x=>x.M!=null).Subscribe(x => Console.WriteLine(x.K + " " + x.M.MessageNodeInfo + " " + x.D.Latest));














    }
}
4

1 回答 1

1

假设以下类:

public class Product
{
    public string Type { get; set; } = "Default";
    public decimal Price { get; set; }
}

这是GroupBywith的用法Scan(显示按类型分组的平均产品价格)。诀窍是Select通过分组的 observable 到达各个分组,做任何事情,然后(大概)将它们合并在一起。Select您可以将 the和 the折叠Merge成一个SelectMany,但分开时会更容易阅读:

var productSubject = new Subject<Product>();
var printSignal = new Subject<Unit>();

var latestAverages = productSubject.GroupBy(p => p.Type)
    .Select(g => g
        .Scan((0, 0.0m), (state, item) => (state.Item1 + 1, state.Item2 + item.Price)) //hold in state the count and the running total for each group
        .Select(t => (g.Key, t.Item2 / t.Item1)) //divide to get the average
    )
    .Merge()
    .Scan(ImmutableDictionary<string, decimal>.Empty, (state, t) => state.SetItem(t.Key, t.Item2)); //Finally, cache the average by group.


printSignal.WithLatestFrom(latestAverages, (_, d) => d)
    .Subscribe(avgs =>
    {
        foreach (var avg in avgs)
        {
            Console.WriteLine($"ProductType: {avg.Key}. Average: {avg.Value}");
        }
        Console.WriteLine();
    });

var productsList = new List<Product>()
{
    new Product { Price = 1.00m },
    new Product { Price = 2.00m },
    new Product { Price = 3.00m },

    new Product { Price = 2.00m, Type = "Alternate" },
    new Product { Price = 4.00m, Type = "Alternate" },
    new Product { Price = 6.00m, Type = "Alternate" },
};

productsList.ForEach(p => productSubject.OnNext(p));

printSignal.OnNext(Unit.Default);
productSubject.OnNext(new Product { Price = 4.0m });
printSignal.OnNext(Unit.Default);
productSubject.OnNext(new Product { Price = 8.0m, Type = "Alternate" });
printSignal.OnNext(Unit.Default);

这使用 nuget 包System.Collections.Immutable

于 2018-04-17T15:29:32.690 回答