19

使用 Dataflow CTP(在 TPL 中)

如果当前排队或推迟的项目数小于 BatchSize,是否有办法在超时后自动调用 BatchBlock.TriggerBatch?

更好的是:每次块收到新项目时,此超时应重置为 0。

4

4 回答 4

26

是的,您可以通过将块链接在一起来相当优雅地完成此任务。在这种情况下,您想要设置一个您在 BatchBlock 之前链接的 TransformBlock。看起来像这样:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());

TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

timeoutTransformBlock.LinkTo(yourBatchBlock);

yourBufferBlock.LinkTo(timeoutTransformBlock);
于 2012-02-24T00:52:49.250 回答
6

这是优秀的 Drew Marsh解决方案的监管版本。这个使用该DataflowBlock.Encapsulate方法创建一个封装了定时器+批处理功能的数据流块。除了新参数之外timeout,该CreateBatchBlock方法还支持普通BatchBlock构造函数可用的所有选项。

public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
    int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
    dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
    var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
    var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
    var transformBlock = new TransformBlock<T, T>((T value) =>
    {
        timer.Change(timeout, Timeout.Infinite);
        return value;
    }, new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        TaskScheduler = dataflowBlockOptions.TaskScheduler
    });
    transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
    {
        PropagateCompletion = true
    });
    return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}

替代方案:下面是一个BatchUntilInactiveBlock<T>提供全部BatchBlock<T>功能的类。这个实现是一个BatchBlock<T>实例的薄包装。它比以前的CreateBatchBlock实现具有更少的开销,同时具有类似的行为。

/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to BatchSize, or when a Timeout period has elapsed after receiving the last item.
/// </summary>
public class BatchUntilInactiveBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly Timer _timer;
    private readonly TimeSpan _timeout;

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timer = new Timer(_ => _source.TriggerBatch());
        _timeout = timeout;
    }

    public BatchUntilInactiveBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
            => _source.LinkTo(target, linkOptions);

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            _timer.Change(_timeout, System.Threading.Timeout.InfiniteTimeSpan);
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
于 2019-12-04T09:02:56.797 回答
4

感谢 Drew Marsh 提出的使用 TransformBlock 的想法,它极大地帮助了我最近的解决方案。但是,我认为需要在批处理块之后重置计时器(即,在达到批处理大小或在计时器回调中显式调用 TriggerBatch 方法触发它之后)。如果您每次获得单个项目时都重置计时器,那么它可能会继续重置几次而实际上根本不会触发批处理(不断地将计时器上的“dueTime”推得更远)。

这将使代码片段如下所示:

Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);

TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
    triggerBatchTimer.Change(5000, Timeout.Infinite);

    return value; 
});

yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);

// Start the producer which is populating the BufferBlock etc.
于 2019-12-06T16:47:35.740 回答
-1

您可以使用链接选项

_transformManyBlock.LinkTo(_batchBlock, new DataflowLinkOptions {PropagateCompletion = true});
于 2019-12-04T07:21:30.360 回答