2
  1. 我有一个使用连接的方法(例如下载页面的方法)。
  2. 我必须多次执行此方法(例如下载 1000 页)。
  3. 以同步和顺序方式执行此操作需要很长时间。
  4. 我的资源有限(最多 8 个线程和/或最多 50 个同时连接)
  5. 我想利用所有资源来加速它。
  6. 我知道并行化(PLINQ、Parallel Extensions等)可以解决问题,但我已经尝试过了,由于资源稀缺,这种方法失败了。
  7. 我不想在管理资源的同时重新发明并行此类任务的轮子,必须有人以前做过,并且必须为此提供了一个库/教程。

任何人都可以帮忙吗?

更新当您开始将异步调用与并行化混合以获得最佳性能时,事情会变得更加复杂。这是在几个下载器上实现的,例如 Firefox 下载器,它同时获得 2 个下载,当其中一个完成时,它会获得下一个文件,依此类推。也许实现起来似乎很简单,但是当我实现它时,我曾经并且仍然很难将其通用化(对 WebRequest 和 DbCommand 有用)并处理问题(即超时)

赏金猎人赏金将授予第一个链接可靠且免费的 ($$) .NET 库的人,该库提供了一种简单的 C# 方法来并行化异步任务,如 HttpWebRequests.BegingetResponse 和 SqlCommand.BeginExecuteNonQuery。并行化不能等待 N 个任务完成再启动下一个 N,而是必须在 N 个初始任务之一完成后立即启动一个新任务。该方法必须提供超时处理。

4

11 回答 11

5

查看连接的计数信号量。 http://en.wikipedia.org/wiki/Semaphore_(编程)

编辑:要回答您的评论,.NET Framework 已经有一个。http://msdn.microsoft.com/en-us/library/system.threading.semaphore.aspx

于 2009-01-27T18:15:12.733 回答
5

您能否提供更多信息,为什么 Parallel Linq 不起作用?

我的观点是,您的任务最适合 Plinq。如果您在 8 核机器上运行,PLinq 将拆分为 8 个任务,并为您排队所有剩余的任务。

这是草稿代码,

PagesToDownload.AsParallel().ForAll(DownloadMethodWithLimitConnections);

我不明白为什么 Plinq 会消耗你的资源。根据我的测试,PLinq 的性能甚至比使用 ThreadPool 还要好。

于 2009-02-01T14:22:02.290 回答
4

请参阅CCR。这做到这一点的“正确”方法,尽管您可能会发现图书馆的学习曲线有点多......

于 2009-02-01T14:23:18.387 回答
3

您可以使用 .NETSystem.Threading.ThreadPool类。您可以使用ThreadPool.SetMaxThreads().

于 2009-01-27T18:09:24.820 回答
3

这是我不明白的:你说最多 50 个连接,但只有 8 个线程。根据定义,每个连接“占用”/在一个线程中运行。我的意思是,您没有使用 DMA 或任何其他魔法来减轻 CPU 的负载,因此每次传输都需要一个执行上下文。如果您可以一次启动 50 个异步请求,很好,很好,这样做 - 您应该能够从同一个线程中启动它们,因为调用异步读取函数基本上不需要任何时间。例如,如果您有 8 个核心并希望确保整个核心专用于每次传输(这可能很愚蠢,但这是您的代码,所以...),您一次只能运行 8 次传输。

我的建议是在一个同步块内启动 50 个异步请求,以便它们在您允许它们中的任何一个完成之前全部启动(简化数学)。然后,使用 Jeremy 建议的计数信号量或 mbeckish 建议的同步队列来跟踪剩余的工作。在异步完成回调结束时,启动下一个连接(如果合适)。也就是说,启动 50 个连接,然后当一个连接完成时,使用“完成”事件处理程序启动下一个连接,直到所有工作完成。这不应该需要任何类型的额外库或框架。

于 2009-02-04T16:12:18.587 回答
2

我强烈建议远离线程池,除非是非常短的任务。如果您选择使用信号量,请确保您只阻塞正在排队工作项的代码,而不是在工作项代码的开头,否则如果您的 (semaphore max count * 2) 是,您将很快死锁线程池大于最大池线程。

在实践中,您真的永远无法安全地获取池线程上的锁,也无法安全地调用大多数异步 API(或同步 API,如 HttpWebRequest.GetResponse,因为它还在线程池的掩护下执行异步操作)。

于 2009-01-27T19:54:59.757 回答
2
  1. 创建一个数据结构来跟踪哪些页面已被提取,以及哪些页面仍需要被提取。例如队列

  2. 使用 Producer/Consumer Queue 模式,分派 8 个消费者线程来进行获取。这样,您就知道您永远不会超过 8 个线程的限制。

请参阅此处以获取一个很好的示例。

于 2009-01-27T20:12:24.353 回答
2

Jeffrey Richter 有一个 Power Threading Library 可能会对您有所帮助。它塞满了样本,而且非常强大。我找不到有关连接的快速示例,但是在协调多个异步操作方面,有很多示例可能对您有用。

它可以从这里下载,这里几篇文章和示例。此外,此链接有一篇来自 Jeffrey 的详细文章,解释了并发异步操作。

于 2009-02-01T14:20:27.490 回答
1

异步 WebRequest 方法可能会显得迟缓,因为它们在执行 DNS 查找时会阻塞,然后切换到异步行为。在我自己遵循这条路径之后,启动 8 个线程将请求提供给已经启动线程来完成大部分工作的 API 似乎效率低下。您可能会重新考虑一些具有异步 WebRequest API 缺点的方法。我们的解决方案最终涉及使用同步 API,每个都在自己的线程上。我会对任何评论这种方法的正确性的人感兴趣。

于 2009-02-01T15:17:22.943 回答
1

这就是您在 .net 3.5 中使用基类库的方式:对 SetMinThreads 的调用是可选的 - 看看有没有它会发生什么。

您应该在替换为 DoSomethingThatsSlow 时处理超时

public class ThrottledParallelRunnerTest
{
    public static void Main()
    {
        //since the process is just starting up, we need to boost this
        ThreadPool.SetMinThreads(10, 10);

        IEnumerable<string> args = from i in Enumerable.Range(1, 100)
                                   select "task #" + i;
        ThrottledParallelRun(DoSomethingThatsSlow, args, 8);
    }

    public static void DoSomethingThatsSlow(string urlOrWhatever)
    {
        Console.Out.WriteLine("{1}: began {0}", urlOrWhatever, DateTime.Now.Ticks);
        Thread.Sleep(500);
        Console.Out.WriteLine("{1}: ended {0}", urlOrWhatever, DateTime.Now.Ticks);
    }

    private static void ThrottledParallelRun<T>(Action<T> action, IEnumerable<T> args, int maxThreads)
    {
        //this thing looks after the throttling
        Semaphore semaphore = new Semaphore(maxThreads, maxThreads);

        //wrap the action in a try/finally that releases the semaphore
        Action<T> releasingAction = a =>
                                        {
                                            try
                                            {
                                                action(a);
                                            }
                                            finally
                                            {
                                                semaphore.Release();
                                            }
                                        };

        //store all the IAsyncResult - will help prevent method from returning before completion
        List<IAsyncResult> results = new List<IAsyncResult>();
        foreach (T a in args)
        {
            semaphore.WaitOne();
            results.Add(releasingAction.BeginInvoke(a, null, null));
        }

        //now let's make sure everything's returned. Maybe collate exceptions here?
        foreach (IAsyncResult result in results)
        {
            releasingAction.EndInvoke(result);
        }
    }
}
于 2009-02-06T08:03:54.313 回答
1

您应该看看 F# 异步工作流。

你真的不希望你的代码是并行但异步的

异步是指执行一些不需要阻塞调用线程的长时间运行操作的程序,例如访问网络、调用 Web 服务或执行任何其他一般 I/O 操作

这是一篇关于这个概念的非常有趣的文章,使用 C# 迭代器进行了解释。

这是一关于 F# 和异步编程的好书。

学习曲线非常糟糕(很多奇怪的东西:F# 语法、Async<'a> 类型、monad 等),但它是一种非常强大的方法,并且可以在现实生活中与出色的 C# 互操作一起使用。

The main idea here is continuation: while your're wating for some I/O operations let your threads do something else!

于 2009-02-07T03:22:08.310 回答