6

我计划开发一个系统,其中包含数以万计的对象,每个对象将有多达 42 个(但更可能减少大约 4 或 5 个)单独的操作,它们可能会定期执行。我还计划编写代码来停用计时器,直到对象开始使用。空闲时,每个对象只需要 1 个计时器,但当处于活动状态时,其他计时器将立即启动。起初,对象的数量很少,可能只有几百个,但我预计它会成倍增长,并在几个月内开始达到数万个。

所以,我非常担心我将为计时器和这些对象编写的代码的效率。我可以在三个级别上编写此应用程序,所有这些级别都将成功执行所需的任务。另外,我计划在四核服务器上运行这个系统,所以我想尽可能使用多线程。

为此,我决定使用 System.Timers.Timer 类,它为每个 elapse 事件触发一个新线程。

这些是我正在考虑的 3 个级别:

  1. 一个计时器操作整个应用程序,它遍历每个对象,检查是否需要触发任何其他操作,如果需要,则运行它们,然后继续执行下一个操作。

  2. 多层计时器,其中每个对象都有一个主计时器,它检查对象可能需要执行的所有功能,运行任何准备好的功能,然后将下一个计时器间隔设置为下一个所需的操作时间。

  3. 递归层计时器,其中每个对象中的每个操作都有自己的计时器,该计时器将被触发,然后设置为在下次可用时运行。

选项 1 的问题在于,对于如此多的对象和动作,以这种方式经过的一个单一计时器可能会运行 20 多秒(同时它执行了几百万行循环代码),这可能应该每 1 秒滴答一次. 如果对象不保持同步,则系统可能无法正常工作。

选项 2 的问题在于,它比选项 3 更难编写,但不会差很多,这也意味着系统上可能运行 10,000 多个计时器(每个对象一个),每个都创建和销毁线程像没有人的事一样过去(我不确定这是否是一个问题)。在这种情况下,每个计时器必须每秒至少触发一次,可能运行几百行代码(在极端情况下可能多达一千行)。

选项 3 的问题在于可能会引入系统中的计时器数量之多。我说的是平均 10,000 多个计时器,可能同时运行近 100,000 多个计时器。但是,每个 elapse 事件可能只需要运行 50 行或更少的代码,这使得它们非常短。经过事件的延迟在一个极端的百分之一秒和另一个极端的五分钟之间,平均可能在 1 秒左右。

我精通 Visual Basic .NET,并打算用它来编写它,但我也可以回到我的高中时代并尝试用 C++ 编写它以提高效率,如果它会产生很大的不同(请让我知道你是否有任何关于语言之间代码效率的资料)。还玩弄在集群 Linux 服务器而不是我的四核 Windows 服务器上运行它的概念,但我不确定我是否可以让我的任何 .NET 应用程序在这样的 linux 集群上运行(希望有任何信息也是如此)。

这个主题要回答的主要问题是:

我使用选项 1、2 还是 3,为什么?

〜考虑评论后编辑〜

所以第四个选项涉及带有自旋锁的计时器轮。这是一个工作类:

Public Class Job
Private dFireTime As DateTime
Private objF As CrossAppDomainDelegate
Private objParams() As Object

Public Sub New(ByVal Func As CrossAppDomainDelegate, ByVal Params() As Object, ByVal FireTime As DateTime)
    objF = Func
    dFireTime = FireTime
    objParams = Params
End Sub

Public ReadOnly Property FireTime()
    Get
        Return dFireTime
    End Get
End Property

Public ReadOnly Property Func() As CrossAppDomainDelegate
    Get
        Return objF
    End Get
End Property

Public ReadOnly Property Params() As Object()
    Get
        Return objParams
    End Get
End Property
End Class

然后是主循环实现:

Private Tasks As LinkedList(Of Job)

Private Sub RunTasks()
    While True
        Dim CurrentTime as DateTime = Datetime.Now            

        If Not Tasks.Count = 0 AndAlso Tasks(0).FireTime > CurrentTime Then
            Dim T As Job = Tasks(0)
            Tasks.RemoveFirst()
            T.Func.Invoke()
        Else
            Dim MillisecondDif As Double

            MillisecondDif = Tasks(0).FireTime.Subtract(CurrentTime).Milliseconds
            If MillisecondDif > 30 Then
                Threading.Thread.Sleep(MillisecondDif)
            End If
        End If

    End While
End Sub

我说得对吗?

EpicClanWars.com

~编辑2~

将“任务”这个词换成了“工作”,这样人们就可以停止抱怨它了;)

~编辑3~

添加了用于跟踪时间的变量并确保在需要时发生自旋循环

4

4 回答 4

5

编辑:我记得有趣的采访绝对值得一看:Arun Kishan:Windows 7 内部 - 告别 Windows 内核调度程序锁

正如@Steven Sudit 所说,我再次警告:仅将它用作计时器轮如何工作的演示以及在实施它时必须关心的一些任务。不作为参考实现。在现实世界中,您必须编写更复杂的逻辑来考虑可用资源、调度逻辑等。


这里是 Steven Sudit 所说的好点(详情请阅读帖子评论):

1)选择正确的结构来保存你的工作列表(这通常取决于):

  • SortedList<>(或 SortedDictionary<>)在内存消耗和索引方面很好,但必须实现同步访问

  • ConcurrentQueue<> 将帮助您避免锁定,但您必须实现排序。它也非常节省内存

  • LinkedList<> 在插入和检索方面很好(无论如何我们只需要头部),但需要同步访问(通过它很容易通过无锁实现)并且内存效率不高,因为它存储了两个引用(上一个/下一个)。但是,当您拥有数百万个作业时,它就会成为一个问题,因此它们都占用了大量的内存。

但:

我完全同意@Steven:

没关系:这些都不适合。正确的答案是使用常规队列并自己维护其顺序,因为我们通常只需要从头部或尾部访问它。

通常,我建议使用库中功能最完整的集合,但这不适用于这里,因为这是系统级代码。我们需要自己动手,要么从头开始,要么在功能不太丰富的集合之上

2)为了简化同时作业的处理逻辑,您可以将委托列表(例如,通过 ConcurrentQueue 使其无锁)添加到原始 Job 类中,这样当您同时需要另一个工作时,您只需添加另一个委托即可开始。

@史蒂文:

如果两个任务实际上被安排在同一时间(无论是实际还是有效),这是一种正常情况,不需要使我们的数据结构复杂化。换句话说,我们不需要对同时进行的作业进行分组,这样我们就必须遍历两个不同的集合;我们可以让它们相邻

3)启动/停止调度程序不是那么直接,因此可能导致错误。相反,您可以在使用超时时等待事件。

@史蒂文:

这样,它要么在下一个作业准备好时唤醒,要么在头部之前插入新作业时唤醒。在后一种情况下,它可能需要现在运行它或设置不同的等待。例如,如果有 100 个作业都安排在同一时刻,我们能做的最好的事情就是将它们全部排队。

如果我们需要提供优先级,这是优先调度队列和生产者/消费者关系中的多个池的工作,但它仍然不能证明启动/停止调度程序是合理的。调度程序应始终处于开启状态,运行在有时会放弃核心的单个循环中

4)关于使用刻度:

@史蒂文:

坚持一种类型的刻度很好,但混合和匹配变得丑陋,特别是因为它依赖于硬件。我确信滴答声会比毫秒稍快,因为它存储前者并且必须除以一个常数才能得到后者。这个操作是否最终代价高昂是另一回事,但我可以使用刻度来避免风险。

我的想法:

另一个好点,我同意你的看法。但有时除以常数会变得昂贵,而且速度并不像看起来那么快。但是当我们谈论 100 000 的 DateTimes 时没关系,你是对的,谢谢你的指点。

5)“管理资源”:

@史蒂文:

我要强调的问题是对 GetAvailableThreads 的调用既昂贵又幼稚。在您甚至可以使用它之前,答案就已经过时了。如果我们真的想跟踪,我们可以通过从使用 Interlocked.Increment/Decrement 的包装器调用作业来获取初始值并保持运行计数。即使这样,它也假定程序的其余部分没有使用线程池。如果我们真的想要精细控制,那么这里的正确答案是滚动我们自己的线程池

我绝对同意调用 GetAvailableThreads 是通过 CorGetAvailableThreads 监控可用资源的幼稚方法,但成本并不高。我想证明需要管理资源,并且似乎选择了坏例子。

源代码示例中提供的任何方式都不能被视为监视可用资源的正确方法。我只是想证明你必须考虑一下。Thru 可能没有像示例那样编写好的代码。

6) 使用 Interlocked.CompareExchange:

@史蒂文:

不,这不是一种常见的模式。最常见的模式是短暂锁定。不太常见的是将变量标记为 volatile。不太常见的是使用 VolatileRead 或 MemoryBarrier。即使 Richter 这样做,以这种方式使用 Interlocked.CompareExchange 也是晦涩难懂的。在没有解释性注释的情况下使用它绝对会造成混淆,因为“比较”这个词暗示我们正在进行比较,而实际上我们没有。

你是对的,我必须指出它的用法。


using System;
using System.Threading;

// Job.cs

// WARNING! Your jobs (tasks) have to be ASYNCHRONOUS or at least really short-living
// else it will ruin whole design and ThreadPool usage due to potentially run out of available worker threads in heavy concurrency

// BTW, amount of worker threads != amount of jobs scheduled via ThreadPool
// job may waits for any IO (via async call to Begin/End) at some point 
// and so free its worker thread to another waiting runner

// If you can't achieve this requirements then just use usual Thread class
// but you will lose all ThreadPool's advantages and will get noticeable overhead

// Read http://msdn.microsoft.com/en-us/magazine/cc164139.aspx for some details

// I named class "Job" instead of "Task" to avoid confusion with .NET 4 Task 
public class Job
{
    public DateTime FireTime { get; private set; }

    public WaitCallback DoAction { get; private set; }
    public object Param { get; private set; }

    // Please use UTC datetimes to avoid different timezones problem
    // Also consider to _never_ use DateTime.Now in repeat tasks because it significantly slower 
    // than DateTime.UtcNow (due to using TimeZone and converting time according to it)

    // Here we always work with with UTC
    // It will save you a lot of time when your project will get jobs (tasks) posted from different timezones
    public static Job At(DateTime fireTime, WaitCallback doAction, object param = null)
    {
        return new Job {FireTime = fireTime.ToUniversalTime(), DoAction = doAction, Param = param};
    }

    public override string ToString()
    {
        return string.Format("{0}({1}) at {2}", DoAction != null ? DoAction.Method.Name : string.Empty, Param,
                             FireTime.ToLocalTime().ToString("o"));
    }
}

 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 using System.Threading;

// Dispatcher.cs

// Take a look at System.Runtime IOThreadTimer.cs and IOThreadScheduler.cs
// in Microsoft Reference Source, its interesting reading

public class Dispatcher
{
    // You need sorted tasks by fire time. I use Ticks as a key to gain some speed improvements during checks
    // There are maybe more than one task in same time
    private readonly SortedList<long, List<Job>> _jobs;

    // Synchronization object to access _jobs (and _timer) and make it thread-safe
    // See comment in ScheduleJob about locking
    private readonly object _syncRoot;

    // Queue (RunJobs method) is running flag
    private int _queueRun;

    // Flag to prevent pollute ThreadPool with many times scheduled JobsRun
    private int _jobsRunQueuedInThreadPool;

    // I'll use Stopwatch to measure elapsed interval. It is wrapper around QueryPerformanceCounter
    // It does not consume any additional resources from OS to count

    // Used to check how many OS ticks (not DateTime.Ticks!) elapsed already
    private readonly Stopwatch _curTime;

    // Scheduler start time. It used to build time delta for job
    private readonly long _startTime;

    // System.Threading.Timer to schedule next active time
    // You have to implement syncronized access as it not thread-safe
    // http://msdn.microsoft.com/en-us/magazine/cc164015.aspx
    private readonly Timer _timer;

    // Minimum timer increment to schedule next call via timer instead ThreadPool
    // Read http://www.microsoft.com/whdc/system/pnppwr/powermgmt/Timer-Resolution.mspx
    // By default it around 15 ms
    // If you want to know it exactly use GetSystemTimeAdjustment via Interop ( http://msdn.microsoft.com/en-us/library/ms724394(VS.85).aspx )
    // You want TimeIncrement parameter from there
    private const long MinIncrement = 15 * TimeSpan.TicksPerMillisecond;

    // Maximum scheduled jobs allowed per queue run (specify your own suitable value!)
    // Scheduler will add to ThreadPool queue (and hence count them as processed) no more than this constant

    // This is balance between how quick job will be scheduled after it time elapsed in one side, and 
    // how long JobsList will be blocked and RunJobs owns its thread from ThreadPool
    private const int MaxJobsToSchedulePerCheck = 10;

    // Queue length
    public int Length
    {
        get
        {
            lock (_syncRoot)
            {
                return _jobs.Count;
            }
        }
    }

    public Dispatcher()
    {
        _syncRoot = new object();

        _timer = new Timer(RunJobs);

        _startTime = DateTime.UtcNow.Ticks;
        _curTime = Stopwatch.StartNew();

        _jobs = new SortedList<long, List<Job>>();
    }


    // Is dispatcher still working
    // Warning! Queue ends its work when no more jobs to schedule but started jobs can be still working
    public bool IsWorking()
    {
        return Interlocked.CompareExchange(ref _queueRun, 0, 0) == 1;
    }

    // Just handy method to get current jobs list
    public IEnumerable<Job> GetJobs()
    {
        lock (_syncRoot)
        {
            // We copy original values and return as read-only collection (thread-safety reasons)
            return _jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
        }
    }

    // Add job to scheduler queue (schedule it)
    public void ScheduleJob(Job job)
    {
        // WARNING! This will introduce bottleneck if you have heavy concurrency. 
        // You have to implement lock-free solution to avoid botleneck but this is another complex topic.
        // Also you can avoid lock by using Jeffrey Richter's ReaderWriterGateLock (http://msdn.microsoft.com/en-us/magazine/cc163532.aspx)
        // But it can introduce significant delay under heavy load (due to nature of ThreadPool)
        // I recommend to implement or reuse suitable lock-free algorithm. 
        // It will be best solution in heavy concurrency (if you have to schedule large enough job count per second)
        // otherwise lock or maybe ReaderWriterLockSlim is cheap enough
        lock (_syncRoot)
        {
            // We'll shift start time to quick check when it pasts our _curTime
            var shiftedTime = job.FireTime.Ticks - _startTime;

            List<Job> jobs;
            if (!_jobs.TryGetValue(shiftedTime, out jobs))
            {
                jobs = new List<Job> {job};
                _jobs.Add(shiftedTime, jobs);
            }
            else jobs.Add(job);


            if (Interlocked.CompareExchange(ref _queueRun, 1, 0) == 0)
            {
                // Queue not run, schedule start
                Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0);
                ThreadPool.QueueUserWorkItem(RunJobs);
            }
            else 
            {
                // else queue already up and running but maybe we need to ajust start time
                // See detailed comment in RunJobs

                long firetime = _jobs.Keys[0];
                long delta = firetime - _curTime.Elapsed.Ticks;

                if (delta < MinIncrement)
                {
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else 
                {
                    Console.WriteLine("DEBUG: Wake up time changed. Next event in {0}", TimeSpan.FromTicks(delta));
                    _timer.Change(delta/TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }

        }
    }


    // Job runner
    private void RunJobs(object state)
    {
        // Warning! Here I block list until entire process done, 
        // maybe better will use ReadWriterLockSlim or somewhat (e.g. lock-free)
        // as usually "it depends..."

        // Here processing is really fast (a few operation only) so until you have to schedule many jobs per seconds it does not matter
        lock (_syncRoot)
        {
            // We ready to rerun RunJobs if needed
            Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 0, 1);

            int availWorkerThreads;
            int availCompletionPortThreads;

            // Current thread stats
            ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);

            // You can check max thread limits by
            // ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);

            int jobsAdded = 0;

            while (jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1 && _jobs.Count > 0)
            {
                // SortedList<> implemented as two arrays for keys and values so indexing on key/value will be fast
                // First element
                List<Job> curJobs = _jobs.Values[0];
                long firetime = _jobs.Keys[0];

                // WARNING! Stopwatch ticks are different from DateTime.Ticks
                // so we use _curTime.Elapsed.Ticks instead of _curTime.ElapsedTicks

                // Each tick in the DateTime.Ticks value represents one 100-nanosecond interval. 
                // Each tick in the ElapsedTicks value represents the time interval equal to 1 second divided by the Frequency.
                if (_curTime.Elapsed.Ticks <= firetime) break;

                while (curJobs.Count > 0 &&  jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1)
                {
                    var job = curJobs[0];

                    // Time elapsed and we ready to start job
                    if (job.DoAction != null)
                    {
                        // Schedule new run

                        // I strongly recommend to look at new .NET 4 Task class because it give superior solution for managing Tasks
                        // e.g. cancel run, exception handling, continuation, etc
                        ThreadPool.QueueUserWorkItem(job.DoAction, job);
                        ++jobsAdded;

                        // It may seems that we can just decrease availWorkerThreads by 1 
                        // but don't forget about started jobs they can also consume ThreadPool's threads
                        ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
                    }

                    // Remove job from list of simultaneous jobs
                    curJobs.Remove(job);
                }

                // Remove whole list if its empty
                if (curJobs.Count < 1) _jobs.RemoveAt(0);
            }

            if (_jobs.Count > 0)
            {
                long firetime = _jobs.Keys[0];

                // Time to next event
                long delta = firetime - _curTime.Elapsed.Ticks; 

                if (delta < MinIncrement) 
                {
                    // Schedule next queue check via ThreadPool (immediately)
                    // It may seems we start to consume all resouces when we run out of available threads (due to "infinite" reschdule)
                    // because we pass thru our while loop and just reschedule RunJobs
                    // but this is not right because before RunJobs will be started again
                    // all other thread will advance a bit and maybe even complete its task
                    // so it safe just reschedule RunJobs and hence wait when we get some resources
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else // Schedule next check via timer callback
                {
                    Console.WriteLine("DEBUG: Next event in {0}", TimeSpan.FromTicks(delta)); // just some debug output
                    _timer.Change(delta / TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }
            else // Shutdown the queue, no more jobs
            {
                Console.WriteLine("DEBUG: Queue ends");
                Interlocked.CompareExchange(ref _queueRun, 0, 1); 
            }
        }
    }
}

快速使用示例:

    // Test job worker
    static void SomeJob(object param)
    {
        var job = param as Job;
        if (job == null) return;

        Console.WriteLine("Job started: {0}, [scheduled to: {1}, param: {2}]", DateTime.Now.ToString("o"),
                          job.FireTime.ToLocalTime().ToString("o"), job.Param);
    }

    static void Main(string[] args)
    {
        var curTime = DateTime.UtcNow;
        Console.WriteLine("Current time: {0}", curTime.ToLocalTime().ToString("o"));
        Console.WriteLine();

        var dispatcher = new Dispatcher();

        // Schedule +10 seconds to future
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:1"));
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:2"));

        // Starts almost immediately
        dispatcher.ScheduleJob(Job.At(curTime - TimeSpan.FromMinutes(1), SomeJob, "past"));

        // And last job to test
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(25), SomeJob, "+25 sec"));

        Console.WriteLine("Queue length: {0}, {1}", dispatcher.Length, dispatcher.IsWorking()? "working": "done");
        Console.WriteLine();

        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
        Console.WriteLine();

        Console.ReadLine();

        Console.WriteLine(dispatcher.IsWorking()?"Dispatcher still working": "No more jobs in queue");

        Console.WriteLine();
        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);

        Console.ReadLine();
    }

希望它会有所帮助。


@Steven Sudit 指出了一些问题,所以我在这里尝试给出我的愿景。

1) 我不建议在此处或其他任何地方使用 SortedList,因为它是过时的 .NET 1.1 类

SortedList<>无论如何都不会过时。它仍然存在于 .NET 4.0 中,并在将泛型引入语言时在.NET 2.0中引入。我看不出将它从 .NET 中删除的任何意义。

但我试图回答的真正问题是:什么数据结构可以按排序顺序存储值,并且在存储索引方面将是有效的。有两种合适的即用型数据结构:SortedDictionary<>SortedList<>这里有一些关于如何选择的信息。我只是不想用我自己的代码浪费实现并隐藏主要算法。在这里我可以实现优先级数组或其他东西,但它需要更多的代码行。我看不出有任何理由不在这里使用 SortedList<> ...

顺便说一句,我不明白你为什么不推荐它?原因是什么

2) 一般来说,对于同时发生的事件,不需要用特殊情况使代码复杂化。

当@Jrud 说他可能会有很多任务要安排时,我认为他们可能有大量的并发,所以我演示了如何解决它。但我的观点是:即使你的并发性很低,你仍然有机会同时获得事件。这在多线程环境中或当有许多源想要安排作业时也很容易实现。

联锁功能没有那么复杂、便宜,而且由于 .NET 4.0 内联,所以在这种情况下添加保护没有问题。

3) IsWorking 方法应该只使用内存屏障,然后直接读取值。

我不太确定你是对的。我建议阅读两篇不错的文章:第 4 部分: Joseph Albahari 的 C# 中线程的高级线程和锁如何锁定?杰夫·莫泽。当然,Jeffrey Richter 的 CLR 的第 28 章(原始线程同步结构)通过 C#(第 3 版)。

这里有一些问题:

MemoryBarrier 方法不访问内存,但它强制在调用 MemoryBarrier 之前完成任何较早的程序顺序加载和存储。它还强制在调用 MemoryBarrier 之后完成任何以后的程序顺序加载和存储。MemoryBarrier 远不如其他两种方法有用

重要我知道这可能会很混乱,所以让我把它总结为一个简单的规则:当线程通过共享内存相互通信时,通过调用 VolatileWrite 写入最后一个值,并通过调用 VolatileRead 读取第一个值。

我还推荐:英特尔® 64 和 IA-32 架构软件开发人员手册,如果您认真的话。

所以我不在我的代码中使用 VolatileRead/VolatileWrite 也不是 volatile 关键字,我不认为 Thread.MemoryBarrier 在这里会更好。也许你可以指出我想念的东西?一些文章或深入讨论?

4) GetJobs 方法看起来可以锁定很长一段时间。有必要吗?

首先,它只是一种方便的方法,有时至少需要将所有任务排入队列以进行调试。

但你是不对的。正如我在代码注释中提到的 SortedList<> 实现为两个数组,您可以通过引用源或仅通过在反射器中查看来检查这一点。这里有一些来自参考来源的评论:

// A sorted list internally maintains two arrays that store the keys and
// values of the entries.  

我从 .NET 4.0 获得,但自 2-3.5 以来变化不大

所以我的代码:

_jobs.Values.SelectMany(list => list).ToList().AsReadOnly();

涉及以下内容:

  • 遍历 List 引用数组中的值。索引数组非常快。
  • 遍历每个 List (在内部也作为数组实现)。它也非常快。
  • 构建新的引用列表(通过 ToList())也非常快(只是动态数组)(.NET 具有非常可靠和快速的实现)
  • 构建只读包装器(没有副本,只是迭代器包装器)

因此,我们只是展平了对 Job 对象的引用的只读列表。即使您有数百万个任务,它也非常快。试着衡量自己。

我添加它以显示执行周期期间发生的任何方式(出于调试目的),但我认为它可能很有用。

5) .NET 4.0 中提供了无锁队列。

我建议阅读Stephen Toub的并行编程模式和.NET Framework 4 中的线程安全集合及其性能特征这里还有许多有趣的文章。

所以我引用

ConcurrentQueue(T) 是 .NET Framework 4 中的一种数据结构,它提供对 FIFO(先进先出)有序元素的线程安全访问。在底层,ConcurrentQueue(T) 是使用小数组列表和头尾数组上的无锁操作实现的,因此它与由数组支持并依赖外部使用的 Queue(T) 完全不同监视器以提供同步。ConcurrentQueue(T) 肯定比手动锁定 Queue(T) 更安全和方便,但需要进行一些实验来确定这两种方案的相对性能。在本节的其余部分,我们将手动锁定的 Queue(T) 称为自包含类型,称为 SynchronizedQueue(T)。

它没有任何方法来维护有序队列。没有任何新的线程安全集合,它们都维护无序集合。但是阅读原始@Jrud 描述,我认为我们必须维护需要触发任务的有序时间列表。我错了吗?

6)我不会费心启动和停止调度程序;让它睡到下一份工作

你知道制作睡眠线程池线程的好方法吗?你将如何实施它?

我认为调度员在不处理任何任务并安排工作唤醒它时会“睡觉”。无论如何,没有特殊的处理让它进入睡眠或唤醒,所以在我看来这个过程等于“睡眠”。

如果您告诉我应该在没有可用作业的情况下通过 ThreadPool 重新安排 RunJobs,而您错了,它将消耗太多资源并可能影响已启动的作业。自己试试。当我们可以轻松避免时,为什么要做不必要的工作。

7) 不必担心不同种类的滴答声,你可以坚持毫秒。

你看起来不太对劲。你要么坚持蜱虫,要么完全不关心它。检查 DateTime 实现,每次访问毫秒属性都涉及将内部表示(以刻度为单位)转换为毫秒,包括除法。这会损害旧(奔腾级)计算机的性能(我自己测量,你也可以)。

总的来说,我会同意你的看法。我们在这里不关心表示,因为它不会给我们带来明显的性能提升。

这只是我的习惯。我在最近的项目中处理了数十亿个 DateTime,因此对其进行了相应的编码。在我的项目中,按刻度和 DateTime 的其他组件进行处理之间存在显着差异。

8) 尝试跟踪可用线程似乎不太可能有效

我只是想证明你必须关心它。在现实世界中,您必须实现远离我调度和监控资源的直接逻辑。

我想演示计时器轮算法并指出作者在实现它时必须考虑的一些问题。

你是绝对正确的,我必须警告它。我认为“快速原型”就足够了。我的解决方案无论如何都不能用于生产。

于 2010-10-16T15:05:42.070 回答
3

以上都不是。标准解决方案是保留一个事件列表,这样每个事件都指向下一个要发生的事件。然后,您使用单个计时器并让它仅在下一个事件时及时唤醒。

编辑

貌似这叫做计时器轮

编辑

正如 Sentinel 指出的,应该将事件分派到线程池。这些事件的处理程序应该尽快完成一些工作,并且不会阻塞。如果它需要做 I/O,它应该触发一个异步任务并终止。否则,线程池将溢出。

.NET 4.0Task类在这里可能会有所帮助,尤其是对于它的延续方法。

于 2010-10-15T18:44:20.537 回答
0

这让我想起了旧的航空公司售票系统,在那里你需要排队。根据他们需要什么样的关注,票务请求被放置在不同的队列中。

因此,也许您可​​以拥有需要经常关注的对象队列和需要不经常关注的对象队列。必要时,您可以将它们从一个移到另一个。

您可以为频繁队列设置一个计时器,为不频繁队列设置一个计时器。对于频繁队列,您可以将其拆分为多个队列,每个线程一个。

为了处理频繁队列,您的线程数不应超过内核数。如果你有两个核心,你想做的就是让它们都运转起来。比这更多的线程不会使事情变得更快。事实上,如果处理对象需要磁盘 I/O 或为其他一些共享硬件排队,那么让两个内核都运行可能无济于事。

于 2010-10-15T23:20:54.623 回答
0

您的三个选项中的权衡是在内存和 CPU 之间。更多的计时器意味着更多的计时器节点(内存),将这些计时器聚合到更少的计时器中意味着更多的 CPU,因为您在运行时检查需要服务的事件。启动太多定时器(并使其过期)的 CPU 开销对于一个体面的定时器实现来说并不是太大的问题。

所以,在我看来,如果你有一个好的定时器实现,选择启动尽可能多的定时器(尽可能细化)。但是,如果每个对象的这些计时器中的任何一个是互斥的,请考虑重用计时器节点。

于 2010-10-15T19:16:46.437 回答