2

我正在尝试使用任务并行库实现以下行为:

当消息到达时,我想按顺序处理它们,但要分组处理。因此,当第一条消息到达时,应立即处理。如果在处理第一条消息时有 2 条消息进来,那么它们应该以 2 条为一组进行处理。

我几乎可以使用BatchBlock链接到ActionBlock

var batchBlock = new BatchBlock<int>(100);

var actionBlock = new ActionBlock<int[]>(list =>
    {
        // do work

        // now trigger
        batchBlock.TriggerBatch();
    });

batchBlock.LinkTo(actionBlock);

上面代码的问题是,如果一个项目在TriggerBatch()调用之后到达,那么它需要等待批次填满。如果我在每个帖子之后触发批处理,那么ActionBlock总是会收到单个消息。

4

2 回答 2

0

根据您的逻辑BatchBlock,您可以使用BufferBlock从它接收项目并将它们分批重新发送到目标,而不是。Task因为您需要尝试发送包含批次的消息,并在另一个项目进入时取消它,所以目标块(actionBlock在您的示例中)必须BoundedCapacity设置为 1。

所以,你所做的是你首先收到一些东西。当你有了它,你开始异步发送,你也尝试接收更多的项目。如果先完成发送,则重新开始。如果接收先完成,则取消发送,将接收到的项目添加到批处理中,然后再次启动两个异步操作。

实际代码稍微复杂一些,因为需要处理一些极端情况(接收和发送同时完成;发送不能取消;接收完成,因为整体完成;异常):

public static ITargetBlock<T> CreateBatchingWrapper<T>(
ITargetBlock<IReadOnlyList<T>> target)
{
    // target should have BoundedCapacity == 1,
    // but there is no way to check for that

    var source = new BufferBlock<T>();

    Task.Run(() => BatchItems(source, target));

    return source;
}

private static async Task BatchItems<T>(
    IReceivableSourceBlock<T> source, ITargetBlock<IReadOnlyList<T>> target)
{
    try
    {
        while (true)
        {
            var messages = new List<T>();

            // wait for first message in batch
            if (!await source.OutputAvailableAsync())
            {
                // source was completed, complete target and return
                target.Complete();
                return;
            }

            // receive all there is right now
            source.ReceiveAllInto(messages);

            // try sending what we've got
            var sendCancellation = new CancellationTokenSource();
            var sendTask = target.SendAsync(messages, sendCancellation.Token);

            var outputAvailableTask = source.OutputAvailableAsync();

            while (true)
            {
                await Task.WhenAny(sendTask, outputAvailableTask);

                // got another message, try cancelling send
                if (outputAvailableTask.IsCompleted
                    && outputAvailableTask.Result)
                {
                    sendCancellation.Cancel();

                    // cancellation wasn't successful
                    // and the message was received, start another batch
                    if (!await sendTask.EnsureCancelled() && sendTask.Result)
                        break;

                    // send was cancelled, receive messages
                    source.ReceiveAllInto(messages);

                    // restart both Tasks
                    sendCancellation = new CancellationTokenSource();
                    sendTask = target.SendAsync(
                        messages, sendCancellation.Token);
                    outputAvailableTask = source.OutputAvailableAsync();
                }
                else
                {
                    // we get here in three situations:
                    // 1. send was completed succesfully
                    // 2. send failed
                    // 3. input has completed
                    // in cases 2 and 3, this await is necessary
                    // in case 1, it's harmless
                    await sendTask;

                    break;
                }
            }
        }
    }
    catch (Exception e)
    {
        source.Fault(e);
        target.Fault(e);
    }
}

/// <summary>
/// Returns a Task that completes when the given Task completes.
/// The Result is true if the Task was cancelled,
/// and false if it completed successfully.
/// If the Task was faulted, the returned Task is faulted too.
/// </summary>
public static Task<bool> EnsureCancelled(this Task task)
{
    return task.ContinueWith(t =>
    {
        if (t.IsCanceled)
            return true;
        if (t.IsFaulted)
        {
            // rethrow the exception
            ExceptionDispatchInfo.Capture(task.Exception.InnerException)
                .Throw();
        }

        // completed successfully
        return false;
    });
}

public static void ReceiveAllInto<T>(
    this IReceivableSourceBlock<T> source, List<T> targetCollection)
{
    // TryReceiveAll would be best suited for this, except it's bugged
    // (see http://connect.microsoft.com/VisualStudio/feedback/details/785185)
    T item;
    while (source.TryReceive(out item))
        targetCollection.Add(item);
}
于 2013-12-20T20:59:38.973 回答
0

你也可以使用定时器;这将每 10 秒触发一次批处理

于 2014-01-08T04:34:40.067 回答