编辑:我记得有趣的采访绝对值得一看:Arun Kishan:Windows 7 内部 - 告别 Windows 内核调度程序锁
正如@Steven Sudit 所说,我再次警告:仅将它用作计时器轮如何工作的演示以及在实施它时必须关心的一些任务。不作为参考实现。在现实世界中,您必须编写更复杂的逻辑来考虑可用资源、调度逻辑等。
这里是 Steven Sudit 所说的好点(详情请阅读帖子评论):
1)选择正确的结构来保存你的工作列表(这通常取决于):
SortedList<>(或 SortedDictionary<>)在内存消耗和索引方面很好,但必须实现同步访问
ConcurrentQueue<> 将帮助您避免锁定,但您必须实现排序。它也非常节省内存
LinkedList<> 在插入和检索方面很好(无论如何我们只需要头部),但需要同步访问(通过它很容易通过无锁实现)并且内存效率不高,因为它存储了两个引用(上一个/下一个)。但是,当您拥有数百万个作业时,它就会成为一个问题,因此它们都占用了大量的内存。
但:
我完全同意@Steven:
没关系:这些都不适合。正确的答案是使用常规队列并自己维护其顺序,因为我们通常只需要从头部或尾部访问它。
通常,我建议使用库中功能最完整的集合,但这不适用于这里,因为这是系统级代码。我们需要自己动手,要么从头开始,要么在功能不太丰富的集合之上
2)为了简化同时作业的处理逻辑,您可以将委托列表(例如,通过 ConcurrentQueue 使其无锁)添加到原始 Job 类中,这样当您同时需要另一个工作时,您只需添加另一个委托即可开始。
@史蒂文:
如果两个任务实际上被安排在同一时间(无论是实际还是有效),这是一种正常情况,不需要使我们的数据结构复杂化。换句话说,我们不需要对同时进行的作业进行分组,这样我们就必须遍历两个不同的集合;我们可以让它们相邻
3)启动/停止调度程序不是那么直接,因此可能导致错误。相反,您可以在使用超时时等待事件。
@史蒂文:
这样,它要么在下一个作业准备好时唤醒,要么在头部之前插入新作业时唤醒。在后一种情况下,它可能需要现在运行它或设置不同的等待。例如,如果有 100 个作业都安排在同一时刻,我们能做的最好的事情就是将它们全部排队。
如果我们需要提供优先级,这是优先调度队列和生产者/消费者关系中的多个池的工作,但它仍然不能证明启动/停止调度程序是合理的。调度程序应始终处于开启状态,运行在有时会放弃核心的单个循环中
4)关于使用刻度:
@史蒂文:
坚持一种类型的刻度很好,但混合和匹配变得丑陋,特别是因为它依赖于硬件。我确信滴答声会比毫秒稍快,因为它存储前者并且必须除以一个常数才能得到后者。这个操作是否最终代价高昂是另一回事,但我可以使用刻度来避免风险。
我的想法:
另一个好点,我同意你的看法。但有时除以常数会变得昂贵,而且速度并不像看起来那么快。但是当我们谈论 100 000 的 DateTimes 时没关系,你是对的,谢谢你的指点。
5)“管理资源”:
@史蒂文:
我要强调的问题是对 GetAvailableThreads 的调用既昂贵又幼稚。在您甚至可以使用它之前,答案就已经过时了。如果我们真的想跟踪,我们可以通过从使用 Interlocked.Increment/Decrement 的包装器调用作业来获取初始值并保持运行计数。即使这样,它也假定程序的其余部分没有使用线程池。如果我们真的想要精细控制,那么这里的正确答案是滚动我们自己的线程池
我绝对同意调用 GetAvailableThreads 是通过 CorGetAvailableThreads 监控可用资源的幼稚方法,但成本并不高。我想证明需要管理资源,并且似乎选择了坏例子。
源代码示例中提供的任何方式都不能被视为监视可用资源的正确方法。我只是想证明你必须考虑一下。Thru 可能没有像示例那样编写好的代码。
6) 使用 Interlocked.CompareExchange:
@史蒂文:
不,这不是一种常见的模式。最常见的模式是短暂锁定。不太常见的是将变量标记为 volatile。不太常见的是使用 VolatileRead 或 MemoryBarrier。即使 Richter 这样做,以这种方式使用 Interlocked.CompareExchange 也是晦涩难懂的。在没有解释性注释的情况下使用它绝对会造成混淆,因为“比较”这个词暗示我们正在进行比较,而实际上我们没有。
你是对的,我必须指出它的用法。
using System;
using System.Threading;
// Job.cs
// WARNING! Your jobs (tasks) have to be ASYNCHRONOUS or at least really short-living
// else it will ruin whole design and ThreadPool usage due to potentially run out of available worker threads in heavy concurrency
// BTW, amount of worker threads != amount of jobs scheduled via ThreadPool
// job may waits for any IO (via async call to Begin/End) at some point
// and so free its worker thread to another waiting runner
// If you can't achieve this requirements then just use usual Thread class
// but you will lose all ThreadPool's advantages and will get noticeable overhead
// Read http://msdn.microsoft.com/en-us/magazine/cc164139.aspx for some details
// I named class "Job" instead of "Task" to avoid confusion with .NET 4 Task
public class Job
{
public DateTime FireTime { get; private set; }
public WaitCallback DoAction { get; private set; }
public object Param { get; private set; }
// Please use UTC datetimes to avoid different timezones problem
// Also consider to _never_ use DateTime.Now in repeat tasks because it significantly slower
// than DateTime.UtcNow (due to using TimeZone and converting time according to it)
// Here we always work with with UTC
// It will save you a lot of time when your project will get jobs (tasks) posted from different timezones
public static Job At(DateTime fireTime, WaitCallback doAction, object param = null)
{
return new Job {FireTime = fireTime.ToUniversalTime(), DoAction = doAction, Param = param};
}
public override string ToString()
{
return string.Format("{0}({1}) at {2}", DoAction != null ? DoAction.Method.Name : string.Empty, Param,
FireTime.ToLocalTime().ToString("o"));
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
// Dispatcher.cs
// Take a look at System.Runtime IOThreadTimer.cs and IOThreadScheduler.cs
// in Microsoft Reference Source, its interesting reading
public class Dispatcher
{
// You need sorted tasks by fire time. I use Ticks as a key to gain some speed improvements during checks
// There are maybe more than one task in same time
private readonly SortedList<long, List<Job>> _jobs;
// Synchronization object to access _jobs (and _timer) and make it thread-safe
// See comment in ScheduleJob about locking
private readonly object _syncRoot;
// Queue (RunJobs method) is running flag
private int _queueRun;
// Flag to prevent pollute ThreadPool with many times scheduled JobsRun
private int _jobsRunQueuedInThreadPool;
// I'll use Stopwatch to measure elapsed interval. It is wrapper around QueryPerformanceCounter
// It does not consume any additional resources from OS to count
// Used to check how many OS ticks (not DateTime.Ticks!) elapsed already
private readonly Stopwatch _curTime;
// Scheduler start time. It used to build time delta for job
private readonly long _startTime;
// System.Threading.Timer to schedule next active time
// You have to implement syncronized access as it not thread-safe
// http://msdn.microsoft.com/en-us/magazine/cc164015.aspx
private readonly Timer _timer;
// Minimum timer increment to schedule next call via timer instead ThreadPool
// Read http://www.microsoft.com/whdc/system/pnppwr/powermgmt/Timer-Resolution.mspx
// By default it around 15 ms
// If you want to know it exactly use GetSystemTimeAdjustment via Interop ( http://msdn.microsoft.com/en-us/library/ms724394(VS.85).aspx )
// You want TimeIncrement parameter from there
private const long MinIncrement = 15 * TimeSpan.TicksPerMillisecond;
// Maximum scheduled jobs allowed per queue run (specify your own suitable value!)
// Scheduler will add to ThreadPool queue (and hence count them as processed) no more than this constant
// This is balance between how quick job will be scheduled after it time elapsed in one side, and
// how long JobsList will be blocked and RunJobs owns its thread from ThreadPool
private const int MaxJobsToSchedulePerCheck = 10;
// Queue length
public int Length
{
get
{
lock (_syncRoot)
{
return _jobs.Count;
}
}
}
public Dispatcher()
{
_syncRoot = new object();
_timer = new Timer(RunJobs);
_startTime = DateTime.UtcNow.Ticks;
_curTime = Stopwatch.StartNew();
_jobs = new SortedList<long, List<Job>>();
}
// Is dispatcher still working
// Warning! Queue ends its work when no more jobs to schedule but started jobs can be still working
public bool IsWorking()
{
return Interlocked.CompareExchange(ref _queueRun, 0, 0) == 1;
}
// Just handy method to get current jobs list
public IEnumerable<Job> GetJobs()
{
lock (_syncRoot)
{
// We copy original values and return as read-only collection (thread-safety reasons)
return _jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
}
}
// Add job to scheduler queue (schedule it)
public void ScheduleJob(Job job)
{
// WARNING! This will introduce bottleneck if you have heavy concurrency.
// You have to implement lock-free solution to avoid botleneck but this is another complex topic.
// Also you can avoid lock by using Jeffrey Richter's ReaderWriterGateLock (http://msdn.microsoft.com/en-us/magazine/cc163532.aspx)
// But it can introduce significant delay under heavy load (due to nature of ThreadPool)
// I recommend to implement or reuse suitable lock-free algorithm.
// It will be best solution in heavy concurrency (if you have to schedule large enough job count per second)
// otherwise lock or maybe ReaderWriterLockSlim is cheap enough
lock (_syncRoot)
{
// We'll shift start time to quick check when it pasts our _curTime
var shiftedTime = job.FireTime.Ticks - _startTime;
List<Job> jobs;
if (!_jobs.TryGetValue(shiftedTime, out jobs))
{
jobs = new List<Job> {job};
_jobs.Add(shiftedTime, jobs);
}
else jobs.Add(job);
if (Interlocked.CompareExchange(ref _queueRun, 1, 0) == 0)
{
// Queue not run, schedule start
Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0);
ThreadPool.QueueUserWorkItem(RunJobs);
}
else
{
// else queue already up and running but maybe we need to ajust start time
// See detailed comment in RunJobs
long firetime = _jobs.Keys[0];
long delta = firetime - _curTime.Elapsed.Ticks;
if (delta < MinIncrement)
{
if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
ThreadPool.QueueUserWorkItem(RunJobs);
}
}
else
{
Console.WriteLine("DEBUG: Wake up time changed. Next event in {0}", TimeSpan.FromTicks(delta));
_timer.Change(delta/TimeSpan.TicksPerMillisecond, Timeout.Infinite);
}
}
}
}
// Job runner
private void RunJobs(object state)
{
// Warning! Here I block list until entire process done,
// maybe better will use ReadWriterLockSlim or somewhat (e.g. lock-free)
// as usually "it depends..."
// Here processing is really fast (a few operation only) so until you have to schedule many jobs per seconds it does not matter
lock (_syncRoot)
{
// We ready to rerun RunJobs if needed
Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 0, 1);
int availWorkerThreads;
int availCompletionPortThreads;
// Current thread stats
ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
// You can check max thread limits by
// ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);
int jobsAdded = 0;
while (jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1 && _jobs.Count > 0)
{
// SortedList<> implemented as two arrays for keys and values so indexing on key/value will be fast
// First element
List<Job> curJobs = _jobs.Values[0];
long firetime = _jobs.Keys[0];
// WARNING! Stopwatch ticks are different from DateTime.Ticks
// so we use _curTime.Elapsed.Ticks instead of _curTime.ElapsedTicks
// Each tick in the DateTime.Ticks value represents one 100-nanosecond interval.
// Each tick in the ElapsedTicks value represents the time interval equal to 1 second divided by the Frequency.
if (_curTime.Elapsed.Ticks <= firetime) break;
while (curJobs.Count > 0 && jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1)
{
var job = curJobs[0];
// Time elapsed and we ready to start job
if (job.DoAction != null)
{
// Schedule new run
// I strongly recommend to look at new .NET 4 Task class because it give superior solution for managing Tasks
// e.g. cancel run, exception handling, continuation, etc
ThreadPool.QueueUserWorkItem(job.DoAction, job);
++jobsAdded;
// It may seems that we can just decrease availWorkerThreads by 1
// but don't forget about started jobs they can also consume ThreadPool's threads
ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
}
// Remove job from list of simultaneous jobs
curJobs.Remove(job);
}
// Remove whole list if its empty
if (curJobs.Count < 1) _jobs.RemoveAt(0);
}
if (_jobs.Count > 0)
{
long firetime = _jobs.Keys[0];
// Time to next event
long delta = firetime - _curTime.Elapsed.Ticks;
if (delta < MinIncrement)
{
// Schedule next queue check via ThreadPool (immediately)
// It may seems we start to consume all resouces when we run out of available threads (due to "infinite" reschdule)
// because we pass thru our while loop and just reschedule RunJobs
// but this is not right because before RunJobs will be started again
// all other thread will advance a bit and maybe even complete its task
// so it safe just reschedule RunJobs and hence wait when we get some resources
if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
ThreadPool.QueueUserWorkItem(RunJobs);
}
}
else // Schedule next check via timer callback
{
Console.WriteLine("DEBUG: Next event in {0}", TimeSpan.FromTicks(delta)); // just some debug output
_timer.Change(delta / TimeSpan.TicksPerMillisecond, Timeout.Infinite);
}
}
else // Shutdown the queue, no more jobs
{
Console.WriteLine("DEBUG: Queue ends");
Interlocked.CompareExchange(ref _queueRun, 0, 1);
}
}
}
}
快速使用示例:
// Test job worker
static void SomeJob(object param)
{
var job = param as Job;
if (job == null) return;
Console.WriteLine("Job started: {0}, [scheduled to: {1}, param: {2}]", DateTime.Now.ToString("o"),
job.FireTime.ToLocalTime().ToString("o"), job.Param);
}
static void Main(string[] args)
{
var curTime = DateTime.UtcNow;
Console.WriteLine("Current time: {0}", curTime.ToLocalTime().ToString("o"));
Console.WriteLine();
var dispatcher = new Dispatcher();
// Schedule +10 seconds to future
dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:1"));
dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:2"));
// Starts almost immediately
dispatcher.ScheduleJob(Job.At(curTime - TimeSpan.FromMinutes(1), SomeJob, "past"));
// And last job to test
dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(25), SomeJob, "+25 sec"));
Console.WriteLine("Queue length: {0}, {1}", dispatcher.Length, dispatcher.IsWorking()? "working": "done");
Console.WriteLine();
foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
Console.WriteLine();
Console.ReadLine();
Console.WriteLine(dispatcher.IsWorking()?"Dispatcher still working": "No more jobs in queue");
Console.WriteLine();
foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
Console.ReadLine();
}
希望它会有所帮助。
@Steven Sudit 指出了一些问题,所以我在这里尝试给出我的愿景。
1) 我不建议在此处或其他任何地方使用 SortedList,因为它是过时的 .NET 1.1 类
SortedList<>无论如何都不会过时。它仍然存在于 .NET 4.0 中,并在将泛型引入语言时在.NET 2.0中引入。我看不出将它从 .NET 中删除的任何意义。
但我试图回答的真正问题是:什么数据结构可以按排序顺序存储值,并且在存储和索引方面将是有效的。有两种合适的即用型数据结构:SortedDictionary<>和SortedList<>。这里有一些关于如何选择的信息。我只是不想用我自己的代码浪费实现并隐藏主要算法。在这里我可以实现优先级数组或其他东西,但它需要更多的代码行。我看不出有任何理由不在这里使用 SortedList<> ...
顺便说一句,我不明白你为什么不推荐它?原因是什么?
2) 一般来说,对于同时发生的事件,不需要用特殊情况使代码复杂化。
当@Jrud 说他可能会有很多任务要安排时,我认为他们可能有大量的并发,所以我演示了如何解决它。但我的观点是:即使你的并发性很低,你仍然有机会同时获得事件。这在多线程环境中或当有许多源想要安排作业时也很容易实现。
联锁功能没有那么复杂、便宜,而且由于 .NET 4.0 内联,所以在这种情况下添加保护没有问题。
3) IsWorking 方法应该只使用内存屏障,然后直接读取值。
我不太确定你是对的。我建议阅读两篇不错的文章:第 4 部分: Joseph Albahari 的 C# 中线程的高级线程和锁如何锁定?杰夫·莫泽。当然,Jeffrey Richter 的 CLR 的第 28 章(原始线程同步结构)通过 C#(第 3 版)。
这里有一些问题:
MemoryBarrier 方法不访问内存,但它强制在调用 MemoryBarrier 之前完成任何较早的程序顺序加载和存储。它还强制在调用 MemoryBarrier 之后完成任何以后的程序顺序加载和存储。MemoryBarrier 远不如其他两种方法有用
重要我知道这可能会很混乱,所以让我把它总结为一个简单的规则:当线程通过共享内存相互通信时,通过调用 VolatileWrite 写入最后一个值,并通过调用 VolatileRead 读取第一个值。
我还推荐:英特尔® 64 和 IA-32 架构软件开发人员手册,如果您认真的话。
所以我不在我的代码中使用 VolatileRead/VolatileWrite 也不是 volatile 关键字,我不认为 Thread.MemoryBarrier 在这里会更好。也许你可以指出我想念的东西?一些文章或深入讨论?
4) GetJobs 方法看起来可以锁定很长一段时间。有必要吗?
首先,它只是一种方便的方法,有时至少需要将所有任务排入队列以进行调试。
但你是不对的。正如我在代码注释中提到的 SortedList<> 实现为两个数组,您可以通过引用源或仅通过在反射器中查看来检查这一点。这里有一些来自参考来源的评论:
// A sorted list internally maintains two arrays that store the keys and
// values of the entries.
我从 .NET 4.0 获得,但自 2-3.5 以来变化不大
所以我的代码:
_jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
涉及以下内容:
- 遍历 List 引用数组中的值。索引数组非常快。
- 遍历每个 List (在内部也作为数组实现)。它也非常快。
- 构建新的引用列表(通过 ToList())也非常快(只是动态数组)(.NET 具有非常可靠和快速的实现)
- 构建只读包装器(没有副本,只是迭代器包装器)
因此,我们只是展平了对 Job 对象的引用的只读列表。即使您有数百万个任务,它也非常快。试着衡量自己。
我添加它以显示执行周期期间发生的任何方式(出于调试目的),但我认为它可能很有用。
5) .NET 4.0 中提供了无锁队列。
我建议阅读Stephen Toub的并行编程模式和.NET Framework 4 中的线程安全集合及其性能特征,这里还有许多有趣的文章。
所以我引用:
ConcurrentQueue(T) 是 .NET Framework 4 中的一种数据结构,它提供对 FIFO(先进先出)有序元素的线程安全访问。在底层,ConcurrentQueue(T) 是使用小数组列表和头尾数组上的无锁操作实现的,因此它与由数组支持并依赖外部使用的 Queue(T) 完全不同监视器以提供同步。ConcurrentQueue(T) 肯定比手动锁定 Queue(T) 更安全和方便,但需要进行一些实验来确定这两种方案的相对性能。在本节的其余部分,我们将手动锁定的 Queue(T) 称为自包含类型,称为 SynchronizedQueue(T)。
它没有任何方法来维护有序队列。没有任何新的线程安全集合,它们都维护无序集合。但是阅读原始@Jrud 描述,我认为我们必须维护需要触发任务的有序时间列表。我错了吗?
6)我不会费心启动和停止调度程序;让它睡到下一份工作
你知道制作睡眠线程池线程的好方法吗?你将如何实施它?
我认为调度员在不处理任何任务并安排工作唤醒它时会“睡觉”。无论如何,没有特殊的处理让它进入睡眠或唤醒,所以在我看来这个过程等于“睡眠”。
如果您告诉我应该在没有可用作业的情况下通过 ThreadPool 重新安排 RunJobs,而您错了,它将消耗太多资源并可能影响已启动的作业。自己试试。当我们可以轻松避免时,为什么要做不必要的工作。
7) 不必担心不同种类的滴答声,你可以坚持毫秒。
你看起来不太对劲。你要么坚持蜱虫,要么完全不关心它。检查 DateTime 实现,每次访问毫秒属性都涉及将内部表示(以刻度为单位)转换为毫秒,包括除法。这会损害旧(奔腾级)计算机的性能(我自己测量,你也可以)。
总的来说,我会同意你的看法。我们在这里不关心表示,因为它不会给我们带来明显的性能提升。
这只是我的习惯。我在最近的项目中处理了数十亿个 DateTime,因此对其进行了相应的编码。在我的项目中,按刻度和 DateTime 的其他组件进行处理之间存在显着差异。
8) 尝试跟踪可用线程似乎不太可能有效
我只是想证明你必须关心它。在现实世界中,您必须实现远离我调度和监控资源的直接逻辑。
我想演示计时器轮算法并指出作者在实现它时必须考虑的一些问题。
你是绝对正确的,我必须警告它。我认为“快速原型”就足够了。我的解决方案无论如何都不能用于生产。