我目前正在编写一个 ProxyChecker 库。我正在使用一个有趣的 Parallel.ForEach 循环来检查所有代理的线程。我正在使用CancellationTokenSource
(cts) 进行软中止(使用cts.Cancel()
)。正如您在下面的代码中看到的那样,我添加了一些“测试代码”,它将当前线程写入控制台。
这是您需要的代码:
private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
{
_cts = new CancellationTokenSource();
int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
mainThread = new Thread(() =>
{
try
{
Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
{
Interlocked.Increment(ref running);
Console.WriteLine("thread running: {0}", running);
try
{
_cts.Token.ThrowIfCancellationRequested();
if (CheckProxy(prox, domainToCheckWith, timeout))
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Increment(ref goodProxies);
Interlocked.Decrement(ref uncheckedProxyCount);
}
else
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Decrement(ref uncheckedProxyCount);
Interlocked.Increment(ref badProxies);
}
_cts.Token.ThrowIfCancellationRequested();
OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
Console.WriteLine("thread running: {0}", running);
Interlocked.Decrement(ref running);
}
});
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
isRunning = false;
OnComplete();
}
});
mainThread.Start();
}
输出(我去掉了几行,因为给你完整的代码是没用的)
thread running: 1
thread running: 1
thread running: 2
thread running: 2
//Slowly going up to 50
thread running: 50
thread running: 50
thread running: 50
//Staying at 50 till I press stop
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46
//Going down...
thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4
然后它停在 4 或 3 或 2 (每次不同)。我等了几分钟,但在 Parallel.ForEach 执行后它并没有关闭,也没有代码。
请求的超时时间为 5000,线程数为 50。
这是用于检查的其他代码:
private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
try
{
WebRequest req = WebRequest.Create(domainToCheckWith);
req.Proxy = new WebProxy(proxy);
req.Timeout = timeout;
var response = (HttpWebResponse) req.GetResponse();
string responseString = ReadResponseString(response);
if (responseString.Contains("SOMETHING HERE"))
{
OnGoodProxy(proxy);
return true;
}
if (responseString.Contains("SOMEOTHERTHINGHERE"))
{
OnBadProxy(proxy);
return false;
}
OnBadProxy(proxy);
return false;
}
catch (WebException ex)
{
OnBadProxy(proxy);
return false;
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
return false;
}
}
停止功能:
public void StopChecking()
{
try
{
if (_cts != null && mainThread.IsAlive)
{
if (_cts.IsCancellationRequested)
{
mainThread.Abort();
OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
while (mainThread.IsAlive) ;
OnComplete();
isRunning = false;
}
else
{
_cts.Cancel();
OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
}
}
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
}
重要编辑:
我将此添加到 CeckProxy 函数中:
Stopwatch sw = new Stopwatch();
sw.Start();
string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
sw.Stop();
这是最后几个线程的结果:
thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3
为什么这么长?我的意思是180秒?!