5

我有一个应用程序,我可以在其中监视和控制一堆计算机(可能是 3 到 35 台左右,可能是本地计算机)。

我监控的一件事是正常运行时间/ping 状态。应用程序的目的之一是重新启动盒子,有时它们会因为其他原因重新启动。

我希望能够快速获取可 ping 的/不可 ping 的更改。

我有一个线程上的旋转循环。

在我看来,阻塞 ping 会阻止它更新一点,即使你并行运行它(防止一个盒子的 ping 阻塞另一个盒子)

(并行实现示例,注意以下只是我的想法,尚未实现,可能包含错误)

var startTime = DateTime.Now;
var period = TimeSpan.FromSeconds();
Parallel.ForEach(boxes, (box) => 
{
    var now = DateTime.Now;
    var remainingTime = (now - startTime) - period;
    while(remainingTime > TimeSpan.Zero)
    {
        box.CanPing.TryUpdate();
    }
});

TryUpdate 就像

using(ping = new Ping())
{
    var reply = ping.Send (IP);
    bool upStatus = (reply.Status == IPStatus.Success);
    this.Value = upStatus;
}

或者,我尝试使用多个 SendAsync(一次多个异步 ping)来尽快发现正常运行时间,并在 SendAsync 的回调中进行双重检查锁定

if(upStatus != this.Value)
{
    lock(_lock)//is it safe to have a non static readonly lock object, all the examples seem to use a static object but that wouldn't scale to  locking in multiple instances of the containing class object
    {
        if(upStatus != this.Value)
        {
            ...
        }
    }
}

这是一个可怕的内存泄漏,但这可能是因为我太快地进行了太多异步 ping 调用(每个都带有一个线程),并且没有处理 ping。如果我一次将自己限制为每台计算机 3 台,或者在中间暂停更长的时间,然后 Dispose() ping 你认为这是个好主意吗?

更好的策略是什么?还有其他想法吗?

4

4 回答 4

8

这是多线程的一个特殊情况,您不需要踏板来使程序更快,您需要使其更具响应性。您的操作几乎不需要计算能力。因此,我不会害怕为每台受监控的计算机创建一个线程。sleep()无论如何,他们大部分时间都会这样做。它们应该被创建一次,因为线程创建实际上是这里最昂贵的事情。

我会像这样创建对象层次结构:

  • GUIProxy- 将处理所有 gui 操作,例如更改计算机名称旁边的通知颜色
  • HostManager- 将注册新机器,删除旧机器,执行时间检查Monitors
  • HostMonitor- 会定期、按顺序发送 ping 来检查计算机。稍后详细了解它的行为

检查算法

在 LAN 中,大多数时间 ping 在发送后 1-2 毫秒内返回。在互联网上,时间可能会有所不同。Monitor根据机器位置,我将为每个单独设置两个 ping 时间阈值。当 LAN ping 大于 5 毫秒或 Internet ping > 200 毫秒时,一个将是“警告”阈值(GUI 中的黄灯或某事)。第二个是“错误”阈值,LAN>1s 和 Internet>2s 或 sth。每个Monitor人都会发送 ping,等待答案,并在收到答案后发送另一个 ping。它应该存储lastPingSendTime和。前者用于确定延迟,后者用于检查. 当然,应该正确处理超时和其他系统/网络事件。lastPingReceiveTimecurrentPingSendTimeHostManagerMonitor

HostManager中,同样在单个线程上运行,我会检查currentPingSendTime每个监视器上的 ,并根据该监视器的阈值检查它。如果超过阈值,GUIProxy将通知在 GUI 中显示情况。

优点

  • 你自己控制线程
  • 您可以使用同步(更简单)的 ping 方法
  • Manager不会挂起,因为它异步访问监视器
  • 您可以实现一个抽象的 Monitor 接口,您可以使用它来监控其他事物,而不仅仅是计算机

缺点

  • 正确的Monitor线程实现可能并不简单
于 2013-07-11T08:09:22.107 回答
2

根据您是否需要扩展解决方案,您可以像 Dariusz 所说的那样实施状态检查(这是一种绝对合法的方法)。

这种方法只有一个缺点,可能与您的场景相关,也可能不相关:扩展到数百甚至数千个受监控的盒子或服务将导致大量线程。关于 64 位模式下的 .net 应用程序支持数千个并发线程这一事实,我不建议生成那么多工作线程。如果你给资源调度器安排这么大量的工人,资源调度器将不再是你最好的朋友。

为了获得一个能够横向扩展的解决方案,这有点困难。让我们很快回到最初的问题:您想快速监控一堆盒子并且流水线处理性能不佳。考虑到您将来可能会监视其他服务 (tcp) 也等待超时将完全杀死这种方法。

解决方案:自定义线程池或线程重用

当您处理受从默认线程池中生成线程的时间影响的特殊类型的线程时,需要解决方案来消除生成问题。考虑到能够扩展我会推荐这种方式:

使用自定义或默认线程池生成多个处于挂起状态的线程。现在你的系统想要测量几个盒子。因此:获取预热线程并获取第一个挂起/空闲的线程并将其保留给您的监控作业。在您获得线程供您使用之后,您可以给他一些实际工作方法的句柄(线程将异步调用该方法)。监控迭代完成后(可能需要一些时间)线程返回结果(好方法是回调)并将自己设置为挂起模式。

所以这只是一个带有预热线程的自定义调度程序。如果您正在使用 ManualResetEvents 构建挂起/恢复,则线程几乎可以立即使用。

还想要更多性能?

如果您仍然获得更多性能并且希望能够以更精细的方式调整您的系统,我会推荐专门的线程池(就像 zabbix 为监控所做的那样)。因此,您不只是分配一组线程,这些线程可能会调用自定义方法来检查一个框是否可以通过 ping 或 tcp 访问,而是为每个 Monitoring 类型分配一个单独的池。因此,在 icmp (ping) 和 tcp Monitoring 的情况下,您将创建至少两个线程池,其中线程已经包含有关“如何检查”的基本知识。在 ping Monitor 的情况下,线程将准备好并等待初始化的 ping 实例,该实例正在等待目标检查。当您将线程从挂起状态中取出时,它会立即检查主机并返回结果。之后它准备睡眠(在这种情况下,已经为下一次运行初始化了环境)。如果您以一种好的方式实现这一点,您甚至可以重用套接字等资源。

总而言之,这种方法使您能够监控 3、35 甚至数百个盒子而不会陷入麻烦。当然,监控仍然是有限的,你不应该派生数千个预热的线程。这不是背后的想法:想法是您已经定义了准备好使用的最大线程数,只是等待让目的地检查。在为多个主机启动监控时,您不必处理分叉问题 - 如果您监控的数量超过了您定义的并发允许的数量,您只需要处理排队问题(这可能比默认情况下的 Parallel.ForEach 高得多spawns 每个核心最多一个线程!检查方法的重载以增加此数量。)

绝对优化

如果您仍然愿意进一步改进系统,请获取您的调度程序和资源计划程序,而不仅仅是一个预热线程数。给他限制,例如最少 4 个,最多 42 个线程。调度程序会考虑在这些边界内启动和停止其他线程。如果您的系统在夜间降低监控率并且您不希望挂起的线程徘徊,这将很有用。

这将是 A+ 实现,因为您不仅能够立即从冷状态开始监控至少一些主机,而且快速启动许多主机 - 您还将返还您真正不需要很长时间的资源。

于 2013-07-17T21:04:37.703 回答
0

我知道这是一种解决编码问题的方法,但是您是否考虑过使用 NagiOS 或 Smokeping 或其他开源监控解决方案?这些可以快速检测连接性下降,并且可能具有您可能不想自己挖掘的许多其他功能。

于 2013-07-16T01:34:39.410 回答
0

由于这似乎是应用程序的一项专用任务,我同意自己管理用于特定任务的线程数可能是有意义的。

此外,您的流程似乎有多个阶段:

  1. 提供下一个要检查的地址
  2. 确定检查时使用的超时。使用的超时时间可能取决于几个因素,包括地址是否在之前的检查中被确定为无响应、它的响应时间通常是多少,以及正如 Dariusz 所提到的,它是否在 LAN、外联网、互联网...... .
  3. 执行 ping
  4. 处理和解释 ping 回复与以前的回复状态和累积状态(例如更新地址的统计信息,甚至可能存储它)。
  5. 对(重复)无响应发出“警报”
  6. 发出重启命令。

因此,如果您有明确的可以独立执行的阶段,使用前一阶段产生的输出,您可能会选择 SEDA(分阶段事件驱动架构)类型的解决方案,您可以在其中为每个阶段分配多个专用线程。并且阶段可以使用 Provider / Producer / Consumer 角色相互连接,用于流经阶段的特定信息项,其中有 ProducerConsumerQueues 来吸收临时不匹配(peek 负载)和自动限制(例如,太多未决的 ping 请求将阻止 ping 请求的生产者,直到执行 ping 的消费者充分赶上)。

对于“Ping 流程”的基本结构,您可能会经历以下阶段:

  1. “PingRequest”生产者阶段,由 IP 地址提供者提供,并使用工厂创建请求(因此工厂可以根据历史记录和 IP 地址的最后已知状态确定请求的超时)。它将请求传递给“PingRequests”的连接消费者。
  2. “Pinger”阶段,从其消费者队列中检索 PingRequests,执行 Ping 并将结果传递给“PingResults”的连接消费者
  3. “ResultProcessor”阶段,从其消费者队列中检索 PingResults,更新 IPAddress 的状态并将结果传递给“PingStatus”的连接消费者。

在第 3 阶段之后,您可能希望以相同的方式添加其他阶段以生成警报、重新启动请求等。

这些阶段中的每一个都可以分配一个专用数量的线程,并且可以非常灵活地对流程进行更改。

几个代码示例来说明:

/// <summary>
/// Coordinates and wires up the processing pipeline.
/// </summary>
public class PingModule : IConsumer<PingStatus>
{
    private readonly ConcurrentDictionary<IPAddress, PingStatus> _status = new ConcurrentDictionary<IPAddress,PingStatus>();
    private readonly CancellationTokenSource _cancelTokenSource;
    private readonly PingRequestProducerWorkStage _requestProducer;
    private readonly PingWorkStage _pinger;
    private readonly PingReplyProcessingWorkStage _replyProcessor;

    public PingModule(IProvider<IPAddress> addressProvider)
    {
        _cancelTokenSource = new CancellationTokenSource();

        _requestProducer = new PingRequestProducerWorkStage(1, addressProvider, NextRequestFor, _cancelTokenSource.Token);
        _pinger = new PingWorkStage(4, 10 * 2, _cancelTokenSource.Token);
        _replyProcessor = new PingReplyProcessingWorkStage(2, 10 * 2, _cancelTokenSource.Token);

        // connect the pipeline.
        _requestProducer.ConnectTo(_pinger);
        _pinger.ConnectTo(_replyProcessor);
        _replyProcessor.ConnectTo(this);
    }

    private PingRequest NextRequestFor(IPAddress address)
    {
        PingStatus curStatus;
        if (!_status.TryGetValue(address, out curStatus))
            return new PingRequest(address, IPStatus.Success, TimeSpan.FromMilliseconds(120));
        if (curStatus.LastResult.TimedOut)
        {
            var newTimeOut = TimeSpan.FromTicks(curStatus.LastResult.TimedOutAfter.Ticks * 2);
            return new PingRequest(address, IPStatus.TimedOut, newTimeOut);
        }
        else
        {
            var newTimeOut = TimeSpan.FromTicks(curStatus.AverageRoundtripTime + 4 * curStatus.RoundTripStandardDeviation);
            return new PingRequest(address, IPStatus.Success, newTimeOut);
        }
    }
    // ...
}

现在可以轻松修改此管道。例如,您可能决定要拥有 2 或 3 个并行的“Pinger”阶段流,其中一个服务于先前断开连接的地址,一个服务于“慢响应者”,一个服务于其余的。这可以通过将阶段 1 连接到执行此路由的消费者并将 PingRequest 传递给正确的“Pinger”来实现。

public class RequestRouter : IConsumer<PingRequest>
{
    private readonly Func<PingRequest, IConsumer<PingRequest>> _selector;

    public RequestRouter(Func<PingRequest, IConsumer<PingRequest>> selector)
    {
        this._selector = selector;
    }
    public void Consume(PingRequest work)
    {
        _selector(work).Consume(work);
    }
    public void Consume(PingRequest work, CancellationToken cancelToken)
    {
        _selector(work).Consume(work, cancelToken);
    }
}

public class PingModule : IConsumer<PingStatus>
{
    // ...
    public PingModule(IProvider<IPAddress> addressProvider)
    {
        _cancelTokenSource = new CancellationTokenSource();

        _requestProducer = new PingRequestProducerWorkStage(1, addressProvider, NextRequestFor, _cancelTokenSource.Token);
        _disconnectedPinger = new PingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
        _slowAddressesPinger = new PingWorkStage(2, 10 * 2, _cancelTokenSource.Token);
        _normalPinger = new PingWorkStage(3, 10 * 2, _cancelTokenSource.Token);
        _requestRouter = new RequestRouter(RoutePingRequest);
        _replyProcessor = new PingReplyProcessingWorkStage(2, 10 * 2, _cancelTokenSource.Token);

        // connect the pipeline
        _requestProducer.ConnectTo(_requestRouter);
        _disconnectedPinger.ConnectTo(_replyProcessor);
        _slowAddressesPinger.ConnectTo(_replyProcessor);
        _normalPinger.ConnectTo(_replyProcessor);
        _replyProcessor.ConnectTo(this);
    }
    private IConsumer<PingRequest> RoutePingRequest(PingRequest request)
    {
        if (request.LastKnownStatus != IPStatus.Success)
            return _disconnectedPinger;
        if (request.PingTimeOut > TimeSpan.FromMilliseconds(500))
            return _slowAddressesPinger;
        return _normalPinger;
    }
    // ...
} 
于 2013-07-15T16:45:45.743 回答