3

我通过端口和接收器将 15 个异步操作链接在一起。这让我非常关心线程间消息传递时间,特别是从任务将数据发布到端口和新任务开始在不同线程上处理相同数据之间的时间。假设每个线程在启动时处于空闲状态的最佳情况,我生成了一个测试,该测试使用秒表类来测量来自两个不同调度程序的时间,每个调度程序都以最高优先级运行单个线程。

令我惊讶的是,我的开发设备是一台运行 Windows 7 x64 的 Q6600 四核 2.4 Ghz 计算机,我的测试的平均上下文切换时间为 5.66 微秒,标准偏差为 5.738 微秒,最大值接近 1.58 毫秒( 282 倍!)。秒表频率为 427.7 纳秒,所以我仍然远离传感器噪音。

我想做的是尽可能减少线程间消息传递时间,同样重要的是,减少上下文切换的标准偏差。我意识到 Windows 不是实时操作系统,也不能保证,但 Windows 调度程序是一个基于公平循环优先级的调度,并且此测试中的两个线程都处于最高优先级(唯一应该是的线程高),所以线程上不应该有任何上下文切换(从 1.58 毫秒的最大时间可以看出......我相信 windows quanta 是 15.65 毫秒?)我唯一能想到的是操作系统调用时间的变化CCR 用于在线程之间传递消息的锁定机制。

请让我知道是否有其他人测量过线程间消息传递时间,并对如何改进它有任何建议。

这是我测试的源代码:

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using Microsoft.Ccr.Core;

using System.Diagnostics;

namespace Test.CCR.TestConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Starting Timer");
            var sw = new Stopwatch();
            sw.Start();

            var dispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "My Thread Pool");
            var dispQueue = new DispatcherQueue("Disp Queue", dispatcher);

            var sDispatcher = new Dispatcher(1, ThreadPriority.Highest, true, "Second Dispatcher");
            var sDispQueue = new DispatcherQueue("Second Queue", sDispatcher);

            var legAPort = new Port<EmptyValue>();
            var legBPort = new Port<TimeSpan>();

            var distances = new List<double>();

            long totalTicks = 0;

            while (sw.Elapsed.TotalMilliseconds < 5000) ;

            int runCnt = 100000;
            int offset = 1000;

            Arbiter.Activate(dispQueue, Arbiter.Receive(true, legAPort, i =>
                                                                            {
                                                                                TimeSpan sTime = sw.Elapsed;
                                                                                legBPort.Post(sTime);
                                                                            }));
            Arbiter.Activate(sDispQueue, Arbiter.Receive(true, legBPort, i =>
                                                                             {
                                                                                 TimeSpan eTime = sw.Elapsed;
                                                                                 TimeSpan dt = eTime.Subtract(i);
                                                                                 //if (distances.Count == 0 || Math.Abs(distances[distances.Count - 1] - dt.TotalMilliseconds) / distances[distances.Count - 1] > 0.1)
                                                                                 distances.Add(dt.TotalMilliseconds);

                                                                                 if(distances.Count > offset)
                                                                                 Interlocked.Add(ref totalTicks,
                                                                                                 dt.Ticks);
                                                                                 if(distances.Count < runCnt)
                                                                                     legAPort.Post(EmptyValue.SharedInstance);
                                                                             }));


            //Thread.Sleep(100);
            legAPort.Post(EmptyValue.SharedInstance);

            Thread.Sleep(500);

            while (distances.Count < runCnt)
                Thread.Sleep(25);

            TimeSpan exTime = TimeSpan.FromTicks(totalTicks);
            double exMS = exTime.TotalMilliseconds / (runCnt - offset);

            Console.WriteLine("Exchange Time: {0} Stopwatch Resolution: {1}", exMS, Stopwatch.Frequency);

            using(var stw = new StreamWriter("test.csv"))
            {
                for(int ix=0; ix < distances.Count; ix++)
                {
                    stw.WriteLine("{0},{1}", ix, distances[ix]);
                }
                stw.Flush();
            }

            Console.ReadKey();
        }
    }
}
4

4 回答 4

2

Windows 不是实时操作系统。但是你已经知道了。杀死你的是上下文切换时间,不一定是消息时间。您并没有真正指定进程间通信的工作方式。如果您真的只是运行多个线程,您会发现不使用 Windows 消息作为通信协议会有所收获,而是尝试使用应用程序托管的消息队列来滚动您自己的 IPC。

发生上下文切换时,对于任何版本的 Windows,您可以期望的最佳平均值是 1 毫秒。您可能会看到应用程序必须屈服于内核的 1 毫秒时间。这是为 Ring-1 应用程序(用户空间)设计的。如果低于 1ms 绝对关键,则需要将一些应用程序切换到 Ring-0,这意味着编写设备驱动程序。

设备驱动程序不会遭受与用户应用程序相同的上下文切换时间,并且还可以访问纳秒分辨率计时器和睡眠呼叫。如果您确实需要这样做,可以从 Microsoft 免费获得 DDK(设备驱动程序开发工具包),但我强烈建议您购买 3rd 方开发工具包。他们通常有非常好的样本和大量的向导来设置正确的东西,这需要你几个月阅读 DDK 文档才能发现。您还需要获得类似 SoftIce 的东西,因为普通的 Visual Studio 调试器不会帮助您调试设备驱动程序。

于 2009-09-29T14:30:50.947 回答
2

15个异步操作一定要异步吗?即您是否由于某些库的限制而被迫以这种方式操作,或者您是否可以选择进行同步调用?

如果您有选择,您需要构建应用程序,以便通过配置参数控制异步性的使用。在不同线程上返回的异步操作与在同一线程上返回的同步操作之间的区别应该在代码中是透明的。这样您就可以在不更改代码结构的情况下对其进行调整。

短语“令人尴尬的并行”描述了一种算法,其中正在完成的大部分工作显然是独立的,因此可以按任何顺序完成,从而易于并行化。

但是您“通过端口和接收器将 15 个异步操作链接在一起”。这可以被描述为“令人尴尬的顺序”。换句话说,同一个程序可以逻辑地写在一个线程上。但是,您将失去异步操作之间发生的 CPU 密集型工作的任何并行性(假设有任何意义)。

如果您编写一个简单的测试来消除任何重要的 CPU 密集型工作并仅测量上下文切换时间,那么猜猜看,您将测量上下文切换时间的变化,正如您所发现的那样。

运行多个线程的唯一原因是因为您有大量工作需要 CPU 完成,因此您希望在多个 CPU 之间共享它。如果单个计算块的寿命足够短,那么上下文切换将是任何操作系统的重大开销。通过将您的计算分解为 15 个阶段,每个阶段都很短,您实际上是在要求操作系统进行许多不必要的上下文切换。

于 2009-10-03T09:04:28.510 回答
0

ThreadPriority.Highest 并不意味着只有线程调度程序本身具有更高的优先级。Win32 API 具有更细粒度的线程优先级(clicky),高于最高级别(IIRC 最高通常是非管理员代码可以运行的最高优先级,管理员可以像任何硬件驱动程序/内核模式代码一样安排更高的优先级) 所以不能保证它们不会被抢占。

即使一个线程以最高优先级运行,如果其他线程持有更高优先级线程所需的资源锁,窗口也可以将其他线程提升到其基本优先级之上,这是您可能遭受上下文切换的另一种可能性。

即使那样,正如您所说,Windows 也不是实时操作系统,并且无论如何都不能保证尊重线程优先级。

于 2009-09-16T23:48:11.487 回答
0

换个方式解决这个问题,需要这么多解耦的异步操作吗?考虑一下可能有用:垂直划分工作(端到端异步处理 numCores 个数据块)而不是水平划分工作(就像现在,每个数据块在 15 个解耦阶段处理);同步耦合您的 15 个阶段中的一些,以将总数减少到较小的数量。

线程间通信的开销总是不小的。如果你的 15 个操作中的一些只做一小部分工作,那么上下文切换会咬你。

于 2009-09-29T13:48:26.287 回答