2

我有一个界面:

public interface IRunner
{
    Task<bool> Run(object o);
}

我有一些使用异步实现它的类,例如:

public class Runner1 : IRunner
{
    public async Task<bool> Run(object o)
    {
        var num  = await SomeExternalAsyncFunc(o);
        return num < 12;
    }
}

我需要实现一个在所有 Runners 类上并行运行的函数,并且只有当它们都返回 true 时才返回 true。在阅读了这个这个之后,我想出了以下实现:

public class RunMannager
{
    public async Task<bool> Run(ConcurrentBag<IRunner> runnersBag, object o)
    {
        var results = new ConcurrentBag<bool>();
        var tasks = runnersBag.Select(async runner => results.Add(await runner.Run(o)));
        await Task.WhenAll(tasks);
        return results.All(result => result);
     }
}

但是,我对这个实现有两个问题:

  • 我希望如果其中一位跑步者已经返回 false,则该函数不应等待所有其他跑步者。

  • 有些跑步者可能永远不会回来,我想使用超时。如果跑步者在 10 秒内没有返回任何内容,则将视为返回 true。

也许使用响应式扩展会有所帮助?

4

2 回答 2

2

这是一个带有 Rx 的版本,包括超时。编写的代码将在添加了 Rx 引用的 LINQPad 中运行。您可以尝试在 RunnerFactory 方法中构建 TrueRunner、FalseRunner 和 SlowRunner 实例。

关键思想:

  • 使用SelectandToObservable()启动异步任务并将其转换为 IObservable(请参阅下文以获得更好的选择)
  • 用于Timeout()为每个任务添加超时,如果任务超时,则根据请求替换真实结果。
  • 用于Where过滤True结果 - 我们现在只收到有关错误结果的通知。
  • Any如果流中有任何元素,则返回 true,否则返回 false,因此在后续中翻转 this 的结果,Select我们就完成了。

代码:

void Main()
{
    Observable.Merge(
        RunnerFactory().Select(x => x.Run(null).ToObservable()
        .Timeout(TimeSpan.FromSeconds(1), Observable.Return(true))))
        .Where(res => !res)
        .Any().Select(res => !res)
        .Subscribe(
            res => Console.WriteLine("Result: " + res),
            ex => Console.WriteLine("Error: " + ex.Message));                                           
}

public IEnumerable<IRunner> RunnerFactory()
{
    yield return new FalseRunner();
    yield return new SlowRunner();
    yield return new TrueRunner();
}

public interface IRunner
{
    Task<bool> Run(object o);
}

public class Runner : IRunner
{
    protected bool _outcome;

    public Runner(bool outcome)
    {
        _outcome = outcome;
    }

    public virtual async Task<bool> Run(object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => _outcome);     
        return result;
    }
}

public class TrueRunner : Runner
{
    public TrueRunner() : base(true) {}
}   

public class FalseRunner : Runner
{
    public FalseRunner() : base(false) {}
}   

public class SlowRunner : Runner
{
    public SlowRunner() : base(false) {}

    public override async Task<bool> Run(object o)
    {
        var result = await Task<bool>.Factory.StartNew(
            () => { Thread.Sleep(5000); return _outcome; });        
        return result;
    }
}   

鉴于我使用的 Runner 实现,其中的 OnError 处理程序是多余的;如果你想在你的实现中抑制 Runner 错误,你可能想要考虑一个 Catch - 你可以IObservable<bool>像我对 Timeout 所做的那样替换一个。

编辑我认为值得一提的另一件事是,使用Observable.StartAsync是启动任务的更好方法,并且还会为您提供取消支持。下面是一些修改后的代码,展示了 SlowRunner 如何支持取消。如果订阅被释放,令牌会被传入StartAsync并导致取消。Any如果检测到一个元素,这一切都会透明地发生。

void Main()
{
    var runners = GetRunners();     

    Observable.Merge(runners.Select(r => Observable.StartAsync(ct => r.Run(ct, null))
                    .Timeout(TimeSpan.FromSeconds(10), Observable.Return(true))))
                    .Where(res => !res)
                    .Any().Select(res => !res)
                    .Subscribe(
                        res => Console.WriteLine("Result: " + res));
}

public static IEnumerable<IRunner> GetRunners()
{
    yield return new Runner(false);
    yield return new SlowRunner(true);
}

public interface IRunner
{
    Task<bool> Run(CancellationToken ct, object o);
}

public class Runner : IRunner
{
    protected bool _outcome;

    public Runner(bool outcome)
    {
        _outcome = outcome;
    }

    public async virtual Task<bool> Run(CancellationToken ct, object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => _outcome);
        return result;
    }
}

public class SlowRunner : Runner
{
    public SlowRunner(bool outcome) : base(outcome)
    {
    }

    public async override Task<bool> Run(CancellationToken ct, object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => 
        {
            for(int i=0; i<5; i++)
            {
                if(ct.IsCancellationRequested)
                {
                    Console.WriteLine("Cancelled");                     
                }
                ct.ThrowIfCancellationRequested();
                Thread.Sleep(1000);
            };
            return _outcome;
        });
        return result;
    }
}
于 2013-10-20T18:35:36.983 回答
0

怎么用Parallel.ForEach()?下面的代码应该让你明白我的意思。

你可以定义CancellationTokenSource

CancellationTokenSource cancellationToken = new CancellationTokenSource();
ParallelOptions po = new ParallelOptions();
po.CancellationToken = cancellationToken.Token;

然后传递poParallel.ForEach

Parallel.ForEach(items, po, item =>
{
   //...
   if(num < 12)
     cancellationToken.Cancel(false);

});

return !cancellationToken.IsCancellationRequested;
于 2013-10-19T09:46:00.647 回答