这是一个带有 Rx 的版本,包括超时。编写的代码将在添加了 Rx 引用的 LINQPad 中运行。您可以尝试在 RunnerFactory 方法中构建 TrueRunner、FalseRunner 和 SlowRunner 实例。
关键思想:
- 使用
Select
andToObservable()
启动异步任务并将其转换为 IObservable(请参阅下文以获得更好的选择)
- 用于
Timeout()
为每个任务添加超时,如果任务超时,则根据请求替换真实结果。
- 用于
Where
过滤True
结果 - 我们现在只收到有关错误结果的通知。
Any
如果流中有任何元素,则返回 true,否则返回 false,因此在后续中翻转 this 的结果,Select
我们就完成了。
代码:
void Main()
{
Observable.Merge(
RunnerFactory().Select(x => x.Run(null).ToObservable()
.Timeout(TimeSpan.FromSeconds(1), Observable.Return(true))))
.Where(res => !res)
.Any().Select(res => !res)
.Subscribe(
res => Console.WriteLine("Result: " + res),
ex => Console.WriteLine("Error: " + ex.Message));
}
public IEnumerable<IRunner> RunnerFactory()
{
yield return new FalseRunner();
yield return new SlowRunner();
yield return new TrueRunner();
}
public interface IRunner
{
Task<bool> Run(object o);
}
public class Runner : IRunner
{
protected bool _outcome;
public Runner(bool outcome)
{
_outcome = outcome;
}
public virtual async Task<bool> Run(object o)
{
var result = await Task<bool>.Factory.StartNew(() => _outcome);
return result;
}
}
public class TrueRunner : Runner
{
public TrueRunner() : base(true) {}
}
public class FalseRunner : Runner
{
public FalseRunner() : base(false) {}
}
public class SlowRunner : Runner
{
public SlowRunner() : base(false) {}
public override async Task<bool> Run(object o)
{
var result = await Task<bool>.Factory.StartNew(
() => { Thread.Sleep(5000); return _outcome; });
return result;
}
}
鉴于我使用的 Runner 实现,其中的 OnError 处理程序是多余的;如果你想在你的实现中抑制 Runner 错误,你可能想要考虑一个 Catch - 你可以IObservable<bool>
像我对 Timeout 所做的那样替换一个。
编辑我认为值得一提的另一件事是,使用Observable.StartAsync
是启动任务的更好方法,并且还会为您提供取消支持。下面是一些修改后的代码,展示了 SlowRunner 如何支持取消。如果订阅被释放,令牌会被传入StartAsync
并导致取消。Any
如果检测到一个元素,这一切都会透明地发生。
void Main()
{
var runners = GetRunners();
Observable.Merge(runners.Select(r => Observable.StartAsync(ct => r.Run(ct, null))
.Timeout(TimeSpan.FromSeconds(10), Observable.Return(true))))
.Where(res => !res)
.Any().Select(res => !res)
.Subscribe(
res => Console.WriteLine("Result: " + res));
}
public static IEnumerable<IRunner> GetRunners()
{
yield return new Runner(false);
yield return new SlowRunner(true);
}
public interface IRunner
{
Task<bool> Run(CancellationToken ct, object o);
}
public class Runner : IRunner
{
protected bool _outcome;
public Runner(bool outcome)
{
_outcome = outcome;
}
public async virtual Task<bool> Run(CancellationToken ct, object o)
{
var result = await Task<bool>.Factory.StartNew(() => _outcome);
return result;
}
}
public class SlowRunner : Runner
{
public SlowRunner(bool outcome) : base(outcome)
{
}
public async override Task<bool> Run(CancellationToken ct, object o)
{
var result = await Task<bool>.Factory.StartNew(() =>
{
for(int i=0; i<5; i++)
{
if(ct.IsCancellationRequested)
{
Console.WriteLine("Cancelled");
}
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
};
return _outcome;
});
return result;
}
}