5

我需要将 Web 服务请求发送到在线 api,并且我认为 Parallel Extensions 非常适合我的需求。

有问题的 Web 服务旨在被重复调用,但有一种机制,如果您每秒调用超过一定数量,就会向您收费。我显然想尽量减少我的费用,所以想知道是否有人见过可以满足以下要求的 TaskScheduler:

  1. 限制每个时间跨度安排的任务数。我猜如果请求的数量超过了这个限制,那么它需要丢弃任务或可能阻塞?(停止积压的任务)
  2. 检测相同的请求是否已经在要执行的调度程序中但尚未执行,如果是,则不将第二个任务排队,而是返回第一个任务。

人们是否觉得这些是任务调度程序应该处理的职责,还是我在找错树?如果您有其他选择,我愿意接受建议。

4

4 回答 4

8

我同意其他人的观点,即 TPL Dataflow 听起来是一个很好的解决方案。

为了限制处理,您可以创建一个TransformBlock实际上不会以任何方式转换数据的方法,如果它在之前的数据之后过早到达,它只会延迟它:

static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
    DateTime lastItem = DateTime.MinValue;
    return new TransformBlock<T, T>(
        async x =>
                {
                    var waitTime = lastItem + delay - DateTime.UtcNow;
                    if (waitTime > TimeSpan.Zero)
                        await Task.Delay(waitTime);

                    lastItem = DateTime.UtcNow;

                    return x;
                },
        new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}

然后创建一个生成数据的方法(例如从 0 开始的整数):

static async Task Producer(ITargetBlock<int> target)
{
    int i = 0;
    while (await target.SendAsync(i))
        i++;
}

它是异步编写的,因此如果目标块现在无法处理项目,它将等待。

然后写一个消费者方法:

static void Consumer(int i)
{
    Console.WriteLine(i);
}

最后,将它们链接在一起并启动它:

var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));

var consumerBlock = new ActionBlock<int>(
    (Action<int>)Consumer,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);

在这里,delayBlock每 500 毫秒最多接受一项,并且该Consumer()方法可以并行运行多次。要完成处理,请调用delayBlock.Complete()

如果您想为每个 #2 添加一些缓存,您可以创建另一个TransformBlock在那里完成工作并将其链接到其他块。

于 2012-03-21T17:06:34.417 回答
3

老实说,我会在更高的抽象级别上工作,并为此使用 TPL 数据流 API。唯一的问题是您需要编写一个自定义块,以您需要的速率限制请求,因为默认情况下,块是“贪婪的”,并且会尽可能快地处理。实现将是这样的:

  1. BufferBlock<T>您要发布到的逻辑块开始。
  2. 将 链接BufferBlock<T>到具有请求/秒和限制逻辑知识的自定义块。
  3. 将自定义块从 2 链接到您的ActionBlock<T>.

我现在没有时间为#2 编写自定义块,但是如果您还没有弄清楚,我会稍后再检查并尝试为您填写一个实现。

于 2012-03-20T22:31:14.690 回答
2

我没有太多使用 RX,但是 AFAICT Observable.Window 方法可以很好地解决这个问题。

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window(VS.103).aspx

它似乎比 Throttle 更合适,Throttle 似乎把元素扔掉了,我猜这不是你想要的

于 2012-03-21T03:41:20.983 回答
0

如果您需要按时间节流,您应该查看Quartz.net。它可以促进一致的轮询。如果您关心所有请求,则应考虑使用某种排队机制。MSMQ 可能是正确的解决方案,但如果您想扩大规模并使用NServiceBusRabbitMQ之类的 ESB,则有许多特定的实现。

更新:

在这种情况下,如果您可以利用 CTP,则 TPL 数据流是您的首选解决方案。一个受限制的 BufferBlock 是解决方案。

此示例来自Microsoft 提供的文档

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> m_buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await m_buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await m_buffer.ReceiveAsync());
    }
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

更新:

查看 RX 的Observable.Throttle

于 2012-03-20T22:03:30.410 回答