由于这似乎是应用程序的一项专用任务,我同意自己管理用于特定任务的线程数可能是有意义的。
此外,您的流程似乎有多个阶段:
- 提供下一个要检查的地址
- 确定检查时使用的超时。使用的超时时间可能取决于几个因素,包括地址是否在之前的检查中被确定为无响应、它的响应时间通常是多少,以及正如 Dariusz 所提到的,它是否在 LAN、外联网、互联网...... .
- 执行 ping
- 处理和解释 ping 回复与以前的回复状态和累积状态(例如更新地址的统计信息,甚至可能存储它)。
- 对(重复)无响应发出“警报”
- 发出重启命令。
因此,如果您有明确的可以独立执行的阶段,使用前一阶段产生的输出,您可能会选择 SEDA(分阶段事件驱动架构)类型的解决方案,您可以在其中为每个阶段分配多个专用线程。并且阶段可以使用 Provider / Producer / Consumer 角色相互连接,用于流经阶段的特定信息项,其中有 ProducerConsumerQueues 来吸收临时不匹配(peek 负载)和自动限制(例如,太多未决的 ping 请求将阻止 ping 请求的生产者,直到执行 ping 的消费者充分赶上)。
对于“Ping 流程”的基本结构,您可能会经历以下阶段:
- “PingRequest”生产者阶段,由 IP 地址提供者提供,并使用工厂创建请求(因此工厂可以根据历史记录和 IP 地址的最后已知状态确定请求的超时)。它将请求传递给“PingRequests”的连接消费者。
- “Pinger”阶段,从其消费者队列中检索 PingRequests,执行 Ping 并将结果传递给“PingResults”的连接消费者
- “ResultProcessor”阶段,从其消费者队列中检索 PingResults,更新 IPAddress 的状态并将结果传递给“PingStatus”的连接消费者。
在第 3 阶段之后,您可能希望以相同的方式添加其他阶段以生成警报、重新启动请求等。
这些阶段中的每一个都可以分配一个专用数量的线程,并且可以非常灵活地对流程进行更改。
几个代码示例来说明:
/// <summary>
/// Coordinates and wires up the processing pipeline.
/// </summary>
public class PingModule : IConsumer<PingStatus>
{
private readonly ConcurrentDictionary<IPAddress, PingStatus> _status = new ConcurrentDictionary<IPAddress,PingStatus>();
private readonly CancellationTokenSource _cancelTokenSource;
private readonly PingRequestProducerWorkStage _requestProducer;
private readonly PingWorkStage _pinger;
private readonly PingReplyProcessingWorkStage _replyProcessor;
public PingModule(IProvider<IPAddress> addressProvider)
{
_cancelTokenSource = new CancellationTokenSource();
_requestProducer = new PingRequestProducerWorkStage(1, addressProvider, NextRequestFor, _cancelTokenSource.Token);
_pinger = new PingWorkStage(4, 10 * 2, _cancelTokenSource.Token);
_replyProcessor = new PingReplyProcessingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
// connect the pipeline.
_requestProducer.ConnectTo(_pinger);
_pinger.ConnectTo(_replyProcessor);
_replyProcessor.ConnectTo(this);
}
private PingRequest NextRequestFor(IPAddress address)
{
PingStatus curStatus;
if (!_status.TryGetValue(address, out curStatus))
return new PingRequest(address, IPStatus.Success, TimeSpan.FromMilliseconds(120));
if (curStatus.LastResult.TimedOut)
{
var newTimeOut = TimeSpan.FromTicks(curStatus.LastResult.TimedOutAfter.Ticks * 2);
return new PingRequest(address, IPStatus.TimedOut, newTimeOut);
}
else
{
var newTimeOut = TimeSpan.FromTicks(curStatus.AverageRoundtripTime + 4 * curStatus.RoundTripStandardDeviation);
return new PingRequest(address, IPStatus.Success, newTimeOut);
}
}
// ...
}
现在可以轻松修改此管道。例如,您可能决定要拥有 2 或 3 个并行的“Pinger”阶段流,其中一个服务于先前断开连接的地址,一个服务于“慢响应者”,一个服务于其余的。这可以通过将阶段 1 连接到执行此路由的消费者并将 PingRequest 传递给正确的“Pinger”来实现。
public class RequestRouter : IConsumer<PingRequest>
{
private readonly Func<PingRequest, IConsumer<PingRequest>> _selector;
public RequestRouter(Func<PingRequest, IConsumer<PingRequest>> selector)
{
this._selector = selector;
}
public void Consume(PingRequest work)
{
_selector(work).Consume(work);
}
public void Consume(PingRequest work, CancellationToken cancelToken)
{
_selector(work).Consume(work, cancelToken);
}
}
public class PingModule : IConsumer<PingStatus>
{
// ...
public PingModule(IProvider<IPAddress> addressProvider)
{
_cancelTokenSource = new CancellationTokenSource();
_requestProducer = new PingRequestProducerWorkStage(1, addressProvider, NextRequestFor, _cancelTokenSource.Token);
_disconnectedPinger = new PingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
_slowAddressesPinger = new PingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
_normalPinger = new PingWorkStage(3, 10 * 2, _cancelTokenSource.Token);
_requestRouter = new RequestRouter(RoutePingRequest);
_replyProcessor = new PingReplyProcessingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
// connect the pipeline
_requestProducer.ConnectTo(_requestRouter);
_disconnectedPinger.ConnectTo(_replyProcessor);
_slowAddressesPinger.ConnectTo(_replyProcessor);
_normalPinger.ConnectTo(_replyProcessor);
_replyProcessor.ConnectTo(this);
}
private IConsumer<PingRequest> RoutePingRequest(PingRequest request)
{
if (request.LastKnownStatus != IPStatus.Success)
return _disconnectedPinger;
if (request.PingTimeOut > TimeSpan.FromMilliseconds(500))
return _slowAddressesPinger;
return _normalPinger;
}
// ...
}