8

我想在一个限制条件下运行定期任务,即在任何给定时间最多只运行一个方法的执行。

我正在尝试使用 Rx,但我不确定如何最多施加一次并发限制。

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());

此外,如果任务仍在运行,我希望后续计划过去。即我不希望任务排队并导致问题。

我有 2 个这样的任务要定期执行。正在执行的任务当前是同步的。但是,如果有必要,我可以让它们异步。

4

4 回答 4

6

您应该按原样测试您的代码,因为这正是 Rx 已经强加的内容。

试试这个作为测试:

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}

private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}

当你运行它时,你会得到这样的输出:

!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119

它已经确保没有重叠。

于 2015-07-14T07:25:53.347 回答
5

您走在正确的轨道上,您可以使用Select+Concat来展平 observable 并限制 inflight 请求的数量(注意:如果您的任务花费的时间超过间隔时间,那么它们将开始堆积,因为它们无法快速执行足够的):

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
          //I assume you are doing async work since you want to limit concurrency
          .Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
          //This is equivalent to calling Merge(1)
          .Concat();

source.Subscribe(/*Handle the result of each operation*/);
于 2015-07-14T07:14:33.713 回答
1

下面是一个PeriodicSequentialExecution方法的两个实现,它通过以周期性方式执行异步方法来创建一个 observable,强制执行不重叠执行策略。可以延长后续执行之间的间隔以防止重叠,在这种情况下,周期会相应地进行时移。

第一个实现纯粹是功能性的,而第二个实现主要是命令式的。两种实现在功能上是相同的。第一个可以提供自定义的IScheduler. 第二个可能效率更高一些。

功能实现:

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default,
    IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= DefaultScheduler.Instance;
    return Delay(dueTime) // Initial delay
        .Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken), linkedCTS => 
            // Execution loop
            Observable.Publish( // Start a hot delay timer before each operation
                Delay(period), hotTimer => Observable
                    .StartAsync(() => action(linkedCTS.Token)) // Start the operation
                    .Concat(hotTimer) // Await the delay timer
            )
            .Repeat()
            .Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
        ));

    IObservable<T> Delay(TimeSpan delay)
        => Observable
            .Timer(delay, scheduler)
            .IgnoreElements()
            .Select(_ => default(T))
            .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
                o.OnError(new OperationCanceledException(cancellationToken)))));
}

命令式实现:

public static IObservable<T> PeriodicSequentialExecution2<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    return Observable.Create<T>(async (observer, ct) =>
    {
        using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
            ct, cancellationToken))
        {
            try
            {
                await Task.Delay(dueTime, linkedCTS.Token);
                while (true)
                {
                    var delayTask = Task.Delay(period, linkedCTS.Token);
                    var result = await action(linkedCTS.Token);
                    observer.OnNext(result);
                    await delayTask;
                }
            }
            catch (Exception ex) { observer.OnError(ex); }
        }
    });
}

cancellationToken参数可用于优雅终止生成的可观察序列。这意味着序列在终止之前等待当前运行的操作完成。如果您希望它立即终止,可能会使工作以一种即发即弃的方式运行而未被观察到,您可以像往常一样简单地处理对可观察序列的订阅。取消cancellationToken以故障状态完成的可观察序列的结果 ( OperationCanceledException)。

于 2020-11-17T03:14:26.433 回答
0

这是一个工厂函数,可以完全满足您的要求。

public static IObservable<Unit> Periodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat());
}

这是一个示例用法

Periodic(TimeSpan.FromSeconds(1))
    .Subscribe(x =>
    {
        Console.WriteLine(DateTime.Now.ToString("mm:ss:fff"));
        Thread.Sleep(500);
    });

如果你运行它,每个控制台打印将相隔大约 1.5 秒。

注意,如果您不希望第一个刻度立即运行,您可以改用这个工厂,它不会在时间跨度之后发送第一个单元。

public static IObservable<Unit> DelayedPeriodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Delay(timeSpan).Repeat();
}
于 2017-03-06T21:12:16.510 回答