我有一个向基于 Web 的 API 提交请求的组件,但必须限制这些请求,以免违反 API 的数据限制。这意味着所有请求都必须通过一个队列来控制它们提交的速率,但它们可以(并且应该)并发执行以实现最大吞吐量。每个请求必须在将来某个时间点完成时将一些数据返回给调用代码。
我正在努力创建一个很好的模型来处理数据的返回。
使用 aBlockingCollection
我不能只Task<TResult>
从Schedule
方法中返回 a,因为入队和出队过程位于缓冲区的两端。因此,我创建了一个RequestItem<TResult>
包含表单回调的类型Action<Task<TResult>>
。
这个想法是,一旦从队列中拉出一个项目,就可以使用已启动的任务调用回调,但是到那时我已经丢失了泛型类型参数,我只能使用反射和各种讨厌的东西(如果它是甚至可能)。
例如:
public class RequestScheduler
{
private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();
public RequestScheduler()
{
this.Start();
}
// This can't return Task<TResult>, so returns void.
// Instead RequestItem is generic but this poses problems when adding to the queue
public void Schedule<TResult>(RequestItem<TResult> request)
{
_queue.Add(request);
}
private void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
// I want to be able to use the original type parameters here
// is there a nice way without reflection?
// ProcessItem submits an HttpWebRequest
Task.Factory.StartNew(() => ProcessItem(item))
.ContinueWith(t => { item.Callback(t); });
}
});
}
public void Stop()
{
_queue.CompleteAdding();
}
}
public class RequestItem<TResult> : IRequestItem
{
public IOperation<TResult> Operation { get; set; }
public Action<Task<TResult>> Callback { get; set; }
}
Task<TResult>
当请求从缓冲区中拉出并提交给 API 时,如何继续缓冲我的请求但向客户端返回一个?