我有一个 observableGroupBy
用于获取多个流。我实际上想要Scan
每个子流的结果。假设 observable 超过产品价格,扫描结果是每种产品类型的平均价格。
我有另一个与这些“产品”相关的事件流(比如说“显示产品价格”事件),我想将它与上一个流的最新产品价格结合起来。因此,Scan
每组的输出需要与事件流的每个元素相结合,以获得该事件产品的最新平均价格。
出于某种原因,我无法获得正确的语法,并且我整天都在抨击这一点。有人可以帮忙吗?
更新
我添加下面的代码来说明大概的意图。
public class Node
{
private List<int> Details = new List<int>();
public void AddInfo(int x)
{
Details.Add(x );
}
public Node(int x)
{
Details.Add(x);
}
public int Index => Details[0]%10; //just to simplify the grouping and debugging
public int Latest => Details.Last();
}
public class Message
{
private static Random _random = new Random();
public int MessageNodeInfo { get; private set; }
public Message()
{
MessageNodeInfo = _random.Next();
}
}
public class AccumulatingInfoTest
{
private static Random _random=new Random();
private IObservable<Message> MessageStream()
{
TimeSpan timeSpan = TimeSpan.FromSeconds(0.5);
var ret= Observable.Generate(0,
_ => { return true; },
_ => { return 0; },
_ => { return new Message(); },
_=> timeSpan)
.Publish()
.RefCount();
return ret;
}
public class ArbitraryCommonClass
{
public int K { get; set; }
public Message M { get; set; }
public Node D { get; set; }
public ArbitraryCommonClass Combine(ArbitraryCommonClass a)
{
return new ArbitraryCommonClass()
{
K = this.K,
M = this.M ?? a.M,
D = this.D ?? a.D
};
}
}
public void Start()
{
var inputStream = MessageStream();
inputStream.Subscribe(y => Console.WriteLine("Input: K " + y.MessageNodeInfo % 10 + " V " + y.MessageNodeInfo));
var nodeInfoStream = inputStream
.Select(nodeInfo => new Node(nodeInfo.MessageNodeInfo))
.GroupBy(node => node.Index)
.Select(groupedObservable => new
{
Key = groupedObservable.Key,
Observable = groupedObservable
.Scan(
(nodeAcc, node) => { nodeAcc.AddInfo(node.Latest); return nodeAcc; }
)
.Select(a => new ArbitraryCommonClass() { K = a.Index, M = (Message)null, D = a })
}
);
var groupedMessageStream =
inputStream
.GroupBy(
m => new Node(m.MessageNodeInfo).Index
)
.Select(a => new
{
Key =a.Key,
Observable = a.Select(b => new ArbitraryCommonClass() { K = a.Key, M = b, D = null })
});
var combinedStreams = nodeInfoStream
.Merge(groupedMessageStream)
.GroupBy(s => s.Key)
.Select(grp => grp
.Scan(
(state, next) => new { Key = state.Key, Observable = Observable.CombineLatest(state.Observable, next.Observable, (x, y) => { return x.Combine(y); }) }
)
)
.Merge()
.SelectMany(x => x.Observable.Select(a=>a));
combinedStreams.Where(x=>x.M!=null).Subscribe(x => Console.WriteLine(x.K + " " + x.M.MessageNodeInfo + " " + x.D.Latest));
}
}