什么是最推荐的 .NET 自定义线程池,它可以有单独的实例,即每个应用程序有多个线程池?我需要一个无限的队列大小(构建一个爬虫),并且需要为我正在爬的每个站点并行运行一个单独的线程池。
编辑:我需要尽快挖掘这些站点以获取信息,为每个站点使用单独的线程池将使我能够控制在任何给定时间在每个站点上工作的线程数。(不超过2-3个)
谢谢罗伊
什么是最推荐的 .NET 自定义线程池,它可以有单独的实例,即每个应用程序有多个线程池?我需要一个无限的队列大小(构建一个爬虫),并且需要为我正在爬的每个站点并行运行一个单独的线程池。
编辑:我需要尽快挖掘这些站点以获取信息,为每个站点使用单独的线程池将使我能够控制在任何给定时间在每个站点上工作的线程数。(不超过2-3个)
谢谢罗伊
我相信Smart Thread Pool可以做到这一点。它的 ThreadPool 类已实例化,因此您应该能够根据需要创建和管理单独的站点特定实例。
阿米吧写了一个优秀的可以实例化的智能线程池。
看看这里
问乔恩·斯基特:http ://www.yoda.arachsys.com/csharp/miscutil/
如果您需要大量并行运行的任务,.Net (TPL) 的并行扩展实际上应该工作得更好。
使用 BlockingCollection 可以用作线程的队列。这是它的一个实现。 更新于 2018-04-23:
public class WorkerPool<T> : IDisposable
{
BlockingCollection<T> queue = new BlockingCollection<T>();
List<Task> taskList;
private CancellationTokenSource cancellationToken;
int maxWorkers;
private bool wasShutDown;
int waitingUnits;
public WorkerPool(CancellationTokenSource cancellationToken, int maxWorkers)
{
this.cancellationToken = cancellationToken;
this.maxWorkers = maxWorkers;
this.taskList = new List<Task>();
}
public void enqueue(T value)
{
queue.Add(value);
waitingUnits++;
}
//call to signal that there are no more item
public void CompleteAdding()
{
queue.CompleteAdding();
}
//create workers and put then running
public void startWorkers(Action<T> worker)
{
for (int i = 0; i < maxWorkers; i++)
{
taskList.Add(new Task(() =>
{
string myname = "worker " + Guid.NewGuid().ToString();
try
{
while (!cancellationToken.IsCancellationRequested)
{
var value = queue.Take();
waitingUnits--;
worker(value);
}
}
catch (Exception ex) when (ex is InvalidOperationException) //throw when collection is closed with CompleteAdding method. No pretty way to do this.
{
//do nothing
}
}));
}
foreach (var task in taskList)
{
task.Start();
}
}
//wait for all workers to be finish their jobs
public void await()
{
while (waitingUnits >0 || !queue.IsAddingCompleted)
Thread.Sleep(100);
shutdown();
}
private void shutdown()
{
wasShutDown = true;
Task.WaitAll(taskList.ToArray());
}
//case something bad happen dismiss all pending work
public void Dispose()
{
if (!wasShutDown)
{
queue.CompleteAdding();
shutdown();
}
}
}
然后像这样使用:
WorkerPool<int> workerPool = new WorkerPool<int>(new CancellationTokenSource(), 5);
workerPool.startWorkers(value =>
{
log.Debug(value);
});
//enqueue all the work
for (int i = 0; i < 100; i++)
{
workerPool.enqueue(i);
}
//Signal no more work
workerPool.CompleteAdding();
//wait all pending work to finish
workerPool.await();
只要创建新的 WorkPool 对象,您就可以进行尽可能多的投票。
这个免费的 nuget 库在这里:CodeFluentRuntimeClient有一个您可以重用的 CustomThreadPool 类。它非常可配置,您可以更改池线程优先级、数量、COM 单元状态、甚至名称(用于调试)以及文化。
另一种方法是使用Dataflow Pipeline。我添加了这些后来的答案,因为我发现 Dataflows 是解决这类问题的更好方法,即拥有多个线程池的问题。它们提供了更灵活和结构化的方法,并且可以轻松地垂直扩展。
您可以将代码分成一个或多个块,然后与 Dataflow 链接,然后让 Dataflow 引擎根据 CPU 和内存可用性分配线程
我建议分成3个块,一个用于准备对站点页面的查询,一个用于访问站点页面,最后一个用于分析数据。这样,慢块(get)可能会分配更多的线程来补偿。
数据流设置如下所示:
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
prepareBlock.LinkTo(get, linkOptions);
getBlock.LinkTo(analiseBlock, linkOptions);
数据将从 流向prepareBlock
,getBlock
然后流向analiseBlock
。块之间的接口可以是任何类,只要相同即可。请参阅Dataflow Pipeline上的完整示例
使用数据流将是这样的:
while ...{
...
prepareBlock.Post(...); //to send data to the pipeline
}
prepareBlock.Complete(); //when done
analiseBlock.Completion.Wait(cancellationTokenSource.Token); //to wait for all queues to empty or cancel