介绍
首先,承认,我是一个 .NET 人,我知道这种方法使用了一些在 Java 中没有直接等效的习语。但我相信你的话,并继续基于这是一个 .NET 人员会喜欢的好问题,并希望它会引导你在 rx-java 中走上正确的道路,这是我从未看过的。这是一个很长的答案,但主要是解释 - 解决方案代码本身很短!
使用任何一个
我们首先需要整理一些工具来帮助解决这个问题。首先是Either<TLeft, TRight>
类型的使用。这很重要,因为每次调用都有两种可能的结果,要么是好的结果,要么是错误的。但是我们需要将它们包装在一个类型中——我们不能使用 OnError 将错误发送回来,因为这会终止结果流。要么看起来有点像元组,而且更容易处理这种情况。Rxx 库有一个非常完整和良好的实现Either
,但这里有一个简单的通用示例,后面是一个对我们的目的有益的简单实现:
var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());
/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
public abstract bool IsLeft { get; }
public bool IsRight { get { return !IsLeft; } }
public abstract TLeft Left { get; }
public abstract TRight Right { get; }
}
public static class Either
{
public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
{
TLeft _leftValue;
public LeftValue(TLeft leftValue)
{
_leftValue = leftValue;
}
public override TLeft Left { get { return _leftValue; } }
public override TRight Right { get { return default(TRight); } }
public override bool IsLeft { get { return true; } }
}
public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
{
TRight _rightValue;
public RightValue(TRight rightValue)
{
_rightValue = rightValue;
}
public override TLeft Left { get { return default(TLeft); } }
public override TRight Right { get { return _rightValue; } }
public override bool IsLeft { get { return false; } }
}
// Factory functions to create left or right-valued Either instances
public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
{
return new LeftValue<TLeft, TRight>(leftValue);
}
public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
{
return new RightValue<TLeft, TRight>(rightValue);
}
}
请注意,按照惯例,当使用 Either 来模拟成功或失败时,右侧用于成功值,因为它当然是“正确的”:)
一些辅助函数
我将使用一些辅助函数来模拟您问题的两个方面。首先,这是一个生成参数的工厂——每次调用它都会返回从 1 开始的整数序列中的下一个整数:
// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
return ++count;
}
接下来,这是一个将您的 Rest 调用模拟为 IObservable 的函数。这个函数接受一个整数并且:
- 如果整数是偶数,则返回一个立即发送 OnError 的 Observable。
- 如果整数是奇数,则返回一个字符串,该字符串将整数与“-ret”连接起来,但仅在一秒钟之后。我们将使用它来检查轮询间隔是否按照您的要求运行 - 作为已完成调用之间的暂停,无论它们花费多长时间,而不是定期间隔。
这里是:
// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
return x % 2 == 0
? Observable.Throw<string>(new Exception())
: Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));
}
现在好位
下面是我调用的一个相当通用的可重用函数Poll
。它接受将被轮询的异步函数、该函数的参数工厂、所需的休息(不是双关语!)间隔,最后是要使用的 IScheduler。
我能想到的最简单的方法是Observable.Create
使用调度程序来驱动结果流。ScheduleAsync
是一种使用 .NET async/await 形式的调度方式。这是一个 .NET 习惯用法,允许您以命令式方式编写异步代码。该async
关键字引入了一个异步函数,该函数可以await
在其主体中进行一个或多个异步调用,并且只有在调用完成时才会继续。我在这个问题中写了一个关于这种调度风格的长篇解释,其中包括在 rx-java 方法中可能更容易实现的旧递归风格。代码如下所示:
public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(observer =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
try
{
var result = await asyncFunction(parameterFactory());
observer.OnNext(Either.Right<Exception,TResult>(result));
}
catch(Exception ex)
{
observer.OnNext(Either.Left<Exception, TResult>(ex));
}
await ctrl.Sleep(interval, ct);
}
});
});
}
打破这一点,Observable.Create
通常是一个用于创建 IObservables 的工厂,它可以让您对如何将结果发布给观察者有很大的控制权。它经常被忽视,有利于不必要地复杂的基元组合。
在这种情况下,我们使用它来创建一个流,Either<TResult, Exception>
以便我们可以返回成功和失败的轮询结果。
该Create
函数接受一个表示订阅者的观察者,我们通过 OnNext/OnError/OnCompleted 将结果传递给该订阅者。IDisposable
我们需要在调用中返回一个Create
- 在 .NET 中,这是订阅者可以取消订阅的句柄。这在这里特别重要,因为否则投票将永远持续下去——或者至少永远不会OnComplete
。
ScheduleAsync
(或 plain )的结果Schedule
就是这样一个句柄。处理后,它将取消我们计划的任何未决事件 - 从而结束轮询循环。在我们的例子中,Sleep
我们用来管理间隔的是可取消操作,尽管可以轻松修改 Poll 函数以接受一个asyncFunction
也接受 a的可取消操作CancellationToken
。
ScheduleAsync 方法接受一个函数,该函数将被调用来安排事件。它传递了两个参数,第一个ctrl
是调度程序本身。第二个ct
是 CancellationToken,我们可以使用它来查看是否已请求取消(通过订阅者处理其订阅句柄)。
轮询本身是通过无限循环执行的,该循环仅在 CancellationToken 指示已请求取消时终止。
在循环中,我们可以使用 async/await 的魔力来异步调用轮询函数,但仍将其包装在异常处理程序中。这太棒了!假设没有错误,我们将结果作为 an 的正确值通过 发送Either
给观察者OnNext
。如果有异常,我们将其作为 an 的左值发送Either
给观察者。最后,我们使用Sleep
调度器上的函数在休息间隔之后安排唤醒调用 - 不要与Thread.Sleep
调用混淆,这通常不会阻塞任何线程。请注意,睡眠也接受CancellationToken
中止的启用!
我想你会同意这是一个非常酷的使用 async/await 来简化本来非常棘手的问题!
示例用法
最后,这里有一些测试代码调用Poll
,以及示例输出 - 对于LINQPad粉丝,此答案中的所有代码一起将在 LINQPad 中运行,并引用 Rx 2.1 程序集:
void Main()
{
var subscription = Poll(SomeRestCall,
ParameterFactory,
TimeSpan.FromSeconds(5),
ThreadPoolScheduler.Instance)
.TimeInterval()
.Subscribe(x => {
Console.Write("Interval: " + x.Interval);
var result = x.Value;
if(result.IsRight)
Console.WriteLine(" Success: " + result.Right);
else
Console.WriteLine(" Error: " + result.Left.Message);
});
Console.ReadLine();
subscription.Dispose();
}
Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.
请注意,如果立即返回错误,则结果之间的间隔为 5 秒(轮询间隔),或成功结果为 6 秒(轮询间隔加上模拟的 REST 调用持续时间)。
编辑 - 这是一个不使用 ScheduleAsync的替代实现,但使用旧式递归调度并且没有 async/await 语法。如您所见,它要麻烦得多——但它也支持取消 asyncFunction observable。
public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(
observer =>
{
var disposable = new CompositeDisposable();
var funcDisposable = new SerialDisposable();
bool cancelRequested = false;
disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
disposable.Add(funcDisposable);
disposable.Add(scheduler.Schedule(interval, self =>
{
funcDisposable.Disposable = asyncFunction(parameterFactory())
.Finally(() =>
{
if (!cancelRequested) self(interval);
})
.Subscribe(
res => observer.OnNext(Either.Right<Exception, TResult>(res)),
ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
}));
return disposable;
});
}
有关避免 .NET 4.5 async/await 功能且不使用 Schedule 调用的不同方法,请参阅我的其他答案。
我确实希望这对 rx-java 人有所帮助!