1

我有IObservable<Packet>一个 hot observable,它允许不同的订阅分析传入的数据包。

我想编写一个方法,发送一些带有 ID 的数据,并等待具有相同 ID 的响应。伪代码:

void SendData(byte[] data, int retries, int timeout, Action<Packet> success, Action fail) 
{
    var sequenceId = GetSequenceId();
    _port.SendData(data, sequenceId);
    _packetStream.Where(p => p.SequenceId == sequenceId)
                 .Take(1)
                 .WaitForTimeout(timeout)
                 .WaitForRetry(retries)
                 .Subscribe(success) //Need to unsubscribe after packet is received
    //If we didn't receive an answer packet, then call fail() action 
}

真的不知道,这些东西通常是如何使用 Reactive Extensions 完成的。很高兴收到一些建议。谢谢。

4

2 回答 2

1

您问题中的代码看起来很接近正确。Rx 框架中存在两种“等待”方法(TimeoutRetry)。我建议您更改方法以返回 anIObservable并删除successandfail参数。这样做“让你保持在 monad 中”,并让你在需要时将更多的操作符链接到 observable 上。当您订阅结果 observable(分别为 OnNext 和 OnError)时,将使用成功和失败参数。

我假设数据应该在超时时重新发送(否则你并没有真正重试)。为此,您可以使用Observable.Create在订阅时发送数据。

IObservable<Packet> SendData(byte[] data, int retries, TimeSpan timeout)
{
    //only get the sequence id once per call to SendData, regardless of retries
    var sequenceId = GetSequenceId();
    return Observable.Create(obs =>
        {   //this code runs every time you subscribe
            _port.SendData(data, sequenceId);
            return _packetStream.Where(p => p.SequenceId == sequenceId)
                                .Take(1)
                                .Timeout(timeout)
                                .Subscribe(obs)
        })
        .Retry(retries); 
}

Retry操作符放在最后会导致 Create observable 在超时时重试。顺便说一句,Timeout 的重载允许您传入另一个可观察的序列以在超时的情况下使用。如果需要,您可以使用此重载以及Observable.Throw在超时的情况下提供您自己的异常,例如提供备用错误消息。

请注意,此代码在您订阅之前不会发送数据,并且在返回结果或达到超时之前不会阻塞,但确实允许您通过 Dispose 订阅取消进一步的重试。此代码也不会阻止您同时发送多个数据包。如果你必须阻止,你可以这样做:

var response = SendData(/* arguments */);
response.Do(success, fail).StartWith(null).ToTask().Wait();

If you are using C# 5 and calling this within an async method, you can await the observable.

于 2012-12-28T19:15:14.327 回答
0

到目前为止,我得到的是:

private void WaitForAnswer(byte sequenceId, int timeout, Action<Packet> success, Action fail, int retriesLeft)
{
    _packetStream.Where(p => p.GetSequence() == sequenceId)
                 .Take(1)
                 .Timeout(TimeSpan.FromMilliseconds(timeout))
                 .Subscribe(success, 
                            _ => {
                                if (retriesLeft > 0) WaitForAnswer(sequenceId, timeout, success, fail, --retriesLeft);
                                else fail();
                            }, 
                            () => { });
}

不过,我不太确定此解决方案是否正确处理订阅。

于 2012-12-28T10:36:43.397 回答