我正在构建一个通用的 URI 检索系统。本质上,有一个通用类Retriever<T>
,它维护一个要检索的 URI 队列。它有一个单独的线程,可以尽可能快地处理该队列。如问题标题所示,URI 类型的一个示例是 HTTP 类型 URI。
问题是,当我开始通过抽象方法请求检索资源时,T RetrieveResource(Uri location)
由于缺乏异步性,它会变慢。
更改 to 的返回类型RetrieveResource
是Task<T>
我的第一个想法。但是,当我们有数千个未完成的任务时,这似乎会使任务堆积起来并导致很多问题。它似乎创建了许多实际线程,而不是利用线程池。我想这只会减慢一切,因为一次发生的事情太多了,所以没有任何单独的事情取得重大进展。
预计我们将有大量排队的项目要检索,并且它们的处理速度不能像排队一样快。随着时间的推移,系统有机会赶上;但这绝对不会很快。
我还考虑过而不是维护一个队列和一个线程来处理它......只是在ThreadPool
. 但是,如果说我需要在处理所有工作项之前关闭系统,或者稍后想要允许优先级排序等,我不确定这是否理想。
我们也知道检索资源是一个耗时的过程(0.250 - 5 秒),但不一定是资源密集型过程。我们可以将其并行化为数百个请求。
我们的要求是:
- URI 可以从任何线程入队,即使系统正在处理队列
- 检索可能需要稍后能够被优先处理
- 检索应该可以暂停
- 当没有检索到任何东西时应该发生最小的旋转(
BlockingCollection
在这里很有用)。
有没有一种在不引入不必要的复杂性的情况下并行化的好方法?
下面是我们现有的一些代码,作为示例。
public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
private readonly Thread worker;
private readonly BlockingCollection<Uri> pending;
private volatile int isStarted;
private volatile int isDisposing;
public event EventHandler<RetrievalEventArgs<T>> Retrieved;
protected Retriever()
{
this.worker = new Thread(this.RetrieveResources);
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.isStarted = 0;
this.isDisposing = 0;
}
~Retriever()
{
this.Dispose(false);
}
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
// This is what needs to be concurrently done.
// In this example, it's synchronous, but just on a separate thread.
T result = this.RetrieveResource(location);
// At this point, we would fire our event with the retrieved data
}
}
protected abstract T RetrieveResource(Uri location);
protected void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}
if (disposing)
{
this.pending.CompleteAdding();
this.worker.Join();
}
}
public void Add(Uri uri)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
public void AddRange(IEnumerable<Uri> uris)
{
foreach (Uri uri in uris)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
}
public void Start()
{
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}
if (this.worker.ThreadState == ThreadState.Unstarted)
{
this.worker.Start();
}
Monitor.Pulse(this.pending);
}
public void Stop()
{
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
以上面的例子为基础......我认为这个解决方案增加了太多的复杂性,或者更确切地说,奇怪的代码......就是这样。
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
Task<T> task = new Task<T>((state) =>
{
return this.RetrieveResource(state as Uri);
}, location);
task.ContinueWith((t) =>
{
T result = t.Result;
RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);
EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
});
task.Start();
}
}