1

我有 8 个逻辑处理器。执行以下代码时

  public void test()
    {
        Parallel.For( 1, 1001, i => { IntensiveWork(i); });
    }

    private static void IntensiveWork(int i)
    {
        Random r=new Random();
        Thread.Sleep(r.Next(i*1));
    }

我注意到 Parallel.For 制作了多批 8 个作业。每个批次将按顺序执行。这里的问题是,如果批处理中的 7/8 个作业完成,那么下一批将继续等待最后一个作业完成。这意味着 7 个核心不会很忙。有没有更好的实现并行性的方法是 C#,其中一旦批处理中的作业完成,它将为该核心分配另一个作业。

4

2 回答 2

1

您可以创建多个任务将从中读取的单个队列。

static void test()
{
    ConcurrentQueue<int> queue = new ConcurrentQueue<int>(Enumerable.Range(1, 1000));
    int taskCount = Environment.ProcessorCount;
    Task[] tasks = new Task[taskCount];
    for (int taskIndex = 0; taskIndex < taskCount; taskIndex++)
    {
        Task task = Task.Factory.StartNew(() => IntensiveWorkTask(queue));
        tasks[taskIndex] = task;
    }
    Task.WaitAll(tasks);
}

private static void IntensiveWorkTask(ConcurrentQueue<int> queue)
{
    while (queue.TryDequeue(out int value))
        IntensiveWork(value);
}

private static void IntensiveWork(int i)
{
    Random r = new Random();
    Thread.Sleep(r.Next(i * 1));
}
于 2019-02-24T08:39:58.313 回答
0

试试微软的反应式框架(又名 Rx)——只需 NuGet System.Reactive,然后添加using System.Reactive.Linq;——然后你可以这样做:

public void test()
{
    IObservable<Unit> query =
        Observable
            .Range(1, 1000)
            .SelectMany(i =>
                Observable
                    .Start(() => IntensiveWork(i)));

    IDisposable subscription = query.Subscribe();
}

private static Random r = new Random();

private static void IntensiveWork(int i)
{
    Thread.Sleep(r.Next(i * 1));
}

.Subscribe(...以便能够在每个工作项完成时对其做出响应。

于 2019-02-24T11:53:00.040 回答