想象一下,您有一个消息队列,其中包含以下 API
class MQ {
public MQ();
// send a single message from your message queue
public void send(string keyPath, string msg);
// Receive a single message from your message queue
public async Task<string> receive(keyPath);
}
为了使这个 RX 兼容
class MQRX: IObserver<string> {
MQ _mq;
string _keyPath
MQRX(string keyPath){
_mq = mq;
_keyPath = keyPath;
}
IObservable<string> Observe(){
return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
}
void OnNext(string msg){
_mq.send(msg);
}
void OnError(Exception e){
// The message queue might not
// support serializing exceptions
// or it might or you might build
// a protocol for it.
}
}
以容错方式使用它。注意 如果 OnError 传递的上游抛出异常,Retry 将重新订阅
new MQRX("users/1/article/2").
Retry().
Subscribe((msg)=>Console.Writeln(msg));
例如,在写作方面,您可以每两秒发送一条消息,如果出现错误,则重试订阅生成器。请注意,在 Observable.Interval 中不太可能出现错误,它只会在每个时间间隔生成一条消息,但想象一下从文件或其他消息队列中读取。
var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
Select((x)=>x.ToString()).
请注意,您可能应该使用 IObservable Catch 扩展方法,而不是盲目地重试,因为您可能会一遍又一遍地遇到相同的错误。重试()。订阅(MQ);