我有IObservable<Packet>
一个 hot observable,它允许不同的订阅分析传入的数据包。
我想编写一个方法,发送一些带有 ID 的数据,并等待具有相同 ID 的响应。伪代码:
void SendData(byte[] data, int retries, int timeout, Action<Packet> success, Action fail)
{
var sequenceId = GetSequenceId();
_port.SendData(data, sequenceId);
_packetStream.Where(p => p.SequenceId == sequenceId)
.Take(1)
.WaitForTimeout(timeout)
.WaitForRetry(retries)
.Subscribe(success) //Need to unsubscribe after packet is received
//If we didn't receive an answer packet, then call fail() action
}
真的不知道,这些东西通常是如何使用 Reactive Extensions 完成的。很高兴收到一些建议。谢谢。