2

我有一个向基于 Web 的 API 提交请求的组件,但必须限制这些请求,以免违反 API 的数据限制。这意味着所有请求都必须通过一个队列来控制它们提交的速率,但它们可以(并且应该)并发执行以实现最大吞吐量。每个请求必须在将来某个时间点完成时将一些数据返回给调用代码。

我正在努力创建一个很好的模型来处理数据的返回。

使用 aBlockingCollection我不能只Task<TResult>Schedule方法中返回 a,因为入队和出队过程位于缓冲区的两端。因此,我创建了一个RequestItem<TResult>包含表单回调的类型Action<Task<TResult>>

这个想法是,一旦从队列中拉出一个项目,就可以使用已启动的任务调用回调,但是到那时我已经丢失了泛型类型参数,我只能使用反射和各种讨厌的东西(如果它是甚至可能)。

例如:

public class RequestScheduler 
{
    private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();

    public RequestScheduler()
    {
        this.Start();
    }

    // This can't return Task<TResult>, so returns void.
    // Instead RequestItem is generic but this poses problems when adding to the queue
    public void Schedule<TResult>(RequestItem<TResult> request)
    {
        _queue.Add(request);
    }

    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var item in _queue.GetConsumingEnumerable())
            {
                // I want to be able to use the original type parameters here
                // is there a nice way without reflection?
                // ProcessItem submits an HttpWebRequest
                Task.Factory.StartNew(() => ProcessItem(item))
                   .ContinueWith(t => { item.Callback(t); });
            }
        });
    }

    public void Stop()
    {
        _queue.CompleteAdding();
    }
}

public class RequestItem<TResult> : IRequestItem
{
    public IOperation<TResult> Operation { get; set; }
    public Action<Task<TResult>> Callback { get; set; }
}

Task<TResult>当请求从缓冲区中拉出并提交给 API 时,如何继续缓冲我的请求但向客户端返回一个?

4

1 回答 1

2

首先,你可以Task<TResult>从中返回Schedule(),你只需要使用TaskCompletionSource它。

其次,为了解决泛型问题,您可以将所有内容隐藏在 (non-generic)Action中。在Schedule()中,使用完全符合您需要的 lambda 创建一个操作。然后消费循环将执行该操作,它不需要知道里面有什么。

第三,我不明白你为什么要Task在循环的每次迭代中开始一个新的。一方面,这意味着您实际上不会受到任何限制。

通过这些修改,代码可能如下所示:

public class RequestScheduler
{
    private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>();

    public RequestScheduler()
    {
        this.Start();
    }

    private void Start()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var action in m_queue.GetConsumingEnumerable())
            {
                action();
            }
        }, TaskCreationOptions.LongRunning);
    }

    public Task<TResult> Schedule<TResult>(IOperation<TResult> operation)
    {
        var tcs = new TaskCompletionSource<TResult>();

        Action action = () =>
        {
            try
            {
                tcs.SetResult(ProcessItem(operation));
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
        };

        m_queue.Add(action);

        return tcs.Task;
    }

    private T ProcessItem<T>(IOperation<T> operation)
    {
        // whatever
    }
}
于 2013-03-19T22:12:10.473 回答