下面是一个PeriodicSequentialExecution
方法的两个实现,它通过以周期性方式执行异步方法来创建一个 observable,强制执行不重叠执行策略。可以延长后续执行之间的间隔以防止重叠,在这种情况下,周期会相应地进行时移。
第一个实现纯粹是功能性的,而第二个实现主要是命令式的。两种实现在功能上是相同的。第一个可以提供自定义的IScheduler
. 第二个可能效率更高一些。
功能实现:
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan dueTime, TimeSpan period,
CancellationToken cancellationToken = default,
IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return Delay(dueTime) // Initial delay
.Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken), linkedCTS =>
// Execution loop
Observable.Publish( // Start a hot delay timer before each operation
Delay(period), hotTimer => Observable
.StartAsync(() => action(linkedCTS.Token)) // Start the operation
.Concat(hotTimer) // Await the delay timer
)
.Repeat()
.Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
));
IObservable<T> Delay(TimeSpan delay)
=> Observable
.Timer(delay, scheduler)
.IgnoreElements()
.Select(_ => default(T))
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
命令式实现:
public static IObservable<T> PeriodicSequentialExecution2<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan dueTime, TimeSpan period,
CancellationToken cancellationToken = default)
{
// Arguments validation omitted
return Observable.Create<T>(async (observer, ct) =>
{
using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
ct, cancellationToken))
{
try
{
await Task.Delay(dueTime, linkedCTS.Token);
while (true)
{
var delayTask = Task.Delay(period, linkedCTS.Token);
var result = await action(linkedCTS.Token);
observer.OnNext(result);
await delayTask;
}
}
catch (Exception ex) { observer.OnError(ex); }
}
});
}
该cancellationToken
参数可用于优雅终止生成的可观察序列。这意味着序列在终止之前等待当前运行的操作完成。如果您希望它立即终止,可能会使工作以一种即发即弃的方式运行而未被观察到,您可以像往常一样简单地处理对可观察序列的订阅。取消cancellationToken
以故障状态完成的可观察序列的结果 ( OperationCanceledException
)。