1

我目前正在编写一个 ProxyChecker 库。我正在使用一个有趣的 Parallel.ForEach 循环来检查所有代理的线程。我正在使用CancellationTokenSource(cts) 进行软中止(使用cts.Cancel())。正如您在下面的代码中看到的那样,我添加了一些“测试代码”,它将当前线程写入控制台。

这是您需要的代码:

private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
        {
            _cts = new CancellationTokenSource();
            int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
            mainThread = new Thread(() =>
            {
                try
                {
                    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
                    {
                        Interlocked.Increment(ref running);
                        Console.WriteLine("thread running: {0}", running);
                        try
                        {
                            _cts.Token.ThrowIfCancellationRequested();
                            if (CheckProxy(prox, domainToCheckWith, timeout))
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Increment(ref goodProxies);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                            }
                            else
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                                Interlocked.Increment(ref badProxies);
                            }
                            _cts.Token.ThrowIfCancellationRequested();
                            OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
                        }
                        catch (OperationCanceledException ex) {}
                        catch (ObjectDisposedException ex) {}
                        catch (Exception ex)
                        {
                            OnLog(ex.Message, Color.Red);
                        }
                        finally
                        {
                            Console.WriteLine("thread running: {0}", running);
                            Interlocked.Decrement(ref running);
                        }
                    });
                }
                catch (OperationCanceledException ex) {}
                catch (ObjectDisposedException ex) {}
                catch (Exception ex)
                {
                    OnLog(ex.Message, Color.Red);
                }
                finally
                {
                    isRunning = false;
                    OnComplete();
                }
            });
            mainThread.Start();
        }

输出(我去掉了几行,因为给你完整的代码是没用的)

thread running: 1
thread running: 1
thread running: 2
thread running: 2

//Slowly going up to  50

thread running: 50
thread running: 50
thread running: 50

//Staying at 50 till I press stop

thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46

//Going down...

thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4

然后它停在 4 或 3 或 2 (每次不同)。我等了几分钟,但在 Parallel.ForEach 执行后它并没有关闭,也没有代码。

请求的超时时间为 5000,线程数为 50。

这是用于检查的其他代码:

private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
    try
    {
        WebRequest req = WebRequest.Create(domainToCheckWith);
        req.Proxy = new WebProxy(proxy);
        req.Timeout = timeout;
        var response = (HttpWebResponse) req.GetResponse();
        string responseString = ReadResponseString(response);

        if (responseString.Contains("SOMETHING HERE"))
        {
            OnGoodProxy(proxy);
            return true;
        }
        if (responseString.Contains("SOMEOTHERTHINGHERE"))
        {
            OnBadProxy(proxy);
            return false;
        }
        OnBadProxy(proxy);
        return false;
    }
    catch (WebException ex)
    {
        OnBadProxy(proxy);
        return false;
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
        return false;
    }
}

停止功能:

public void StopChecking()
{
    try
    {
        if (_cts != null && mainThread.IsAlive)
        {
            if (_cts.IsCancellationRequested)
            {
                mainThread.Abort();
                OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
                while (mainThread.IsAlive) ;
                OnComplete();
                isRunning = false;
            }
            else
            {
                _cts.Cancel();
                OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
            }
        }
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
    }
}

重要编辑:

我将此添加到 CeckProxy 函数中:

        Stopwatch sw = new Stopwatch();
        sw.Start();
        string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
        sw.Stop();

这是最后几个线程的结果:

thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3

为什么这么长?我的意思是180秒?!

4

2 回答 2

0

您可以尝试 lock inside 尝试锁定对象

Object lockObject = new Object();
try
{
    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
    {
        Interlocked.Increment(ref running);
        Console.WriteLine("thread running: {0}", running);
        try
        {
            lock(lockObject)
            {
                //code.............
            }
        }
        catch
        {
        }
    }
}
catch
{
}
于 2014-06-15T20:56:14.240 回答
0

好的,我自己想通了。

我现在连续阅读响应并使用秒表(和 request.ReadWriteTimeout)检查读取部分在达到特定时间(在我的情况下readTimeout)后停止。代码

HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith);
            req.Proxy = new WebProxy(proxy);
            req.Timeout = timeout;
            req.ReadWriteTimeout = readTimeout;
            req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip");
            req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;

            byte[] responseByte = new byte[1024];
            string responseString = string.Empty;

            sw.Start();
            using (WebResponse res = req.GetResponse())
            {
                using (Stream stream = res.GetResponseStream())
                {
                    while (stream.Read(responseByte, 0, responseByte.Length) > 0)
                    {
                        responseString += Encoding.UTF8.GetString(responseByte);
                        if(sw.ElapsedMilliseconds > (long)timeout)
                            throw new WebException();
                    }

                }
            }
            sw.Stop();
于 2014-06-16T10:30:54.443 回答