7

使用 TPL 数据流库,我想做这样的事情:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);

看来 ActionBlock 上的取消令牌取消了整个事情;如果我设置它,我必须制作一个新的 ActionBlock。是否可以使用 ActionBlock 进行部分取消?

不应尝试尚未处理的帖子。如果有一些取消令牌可用于签入当前正在执行的帖子,那就太好了。

4

3 回答 3

4

看一下BroadcastBlock<T>,它仅包含发布到它的最新项目。您可以将广播块放在ActionBlock<T>.

虽然向广播块发布新项目不会取消当前正在由操作块处理的项目,但它会覆盖广播块已经持有的任何现有项目;实际上丢弃任何尚未被操作块处理的旧消息。当动作块完成其当前项目时,它将获取发布到广播块的最新项目。

于 2014-02-24T02:04:34.667 回答
2

除了 Monroe Thomas 的回答之外,重要的是要了解 BroadcastBlock 之后的 ActionBlock 需要将其BoundedCapacity 限制为 1,否则它将存储和处理广播块的每条消息,即使它仍在执行。
这里有一个代码示例:

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber =>
{
  await Task.Delay(100);
  Console.WriteLine($">{ThisNumber}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null);
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true });

for(int IX = 0; IX < 128; IX++)
{
  await ThrottleBlock.SendAsync(IX);
  await Task.Delay(10);
}

这导致以下结果:

>0
>6
>12
>20
>27
>34
>41
>48
>55
>62
>68
>75
>82
>88
>95
>101
>108
>115
>122
>127

享受!
-西蒙

于 2018-01-02T14:56:52.020 回答
1

在 TPL 数据流中没有直接这样的东西,但我可以看到几种方法可以自己实现它:

  1. 如果您不需要能够将修改后的块视为普通数据流块(例如不支持LinkTo()),那么一种简单的方法是编写一个 wraps 的类型ActionBlock,但其项目还包含一个标志,表明它们是否应该被处理。当您指定 时cancelAllPreviousPosts: true,所有这些标志都将被重置,因此将跳过这些项目。

    代码可能如下所示:

    class CancellableActionBlock<T>
    {
        private class Item
        {
            public T Data { get; private set; }
            public bool ShouldProcess { get; set; }
    
            public Item(T data)
            {
                Data = data;
                ShouldProcess = true;
            }
        }
    
        private readonly ActionBlock<Item> actionBlock;
        private readonly ConcurrentDictionary<Item, bool> itemSet;
    
        public CancellableActionBlock(Action<T> action)
        {
            itemSet = new ConcurrentDictionary<Item, bool>();
            actionBlock = new ActionBlock<Item>(item =>
            {
                bool ignored;
                itemSet.TryRemove(item, out ignored);
    
                if (item.ShouldProcess)
                {
                    action(item.Data);
                }
            });
        }
    
        public bool Post(T data, bool cancelAllPreviousPosts = false)
        {
            if (cancelAllPreviousPosts)
            {
                foreach (var item in itemSet.Keys)
                {
                    item.ShouldProcess = false;
                }
                itemSet.Clear();
            }
    
            var newItem = new Item(data);
            itemSet.TryAdd(newItem, true);
            return actionBlock.Post(newItem);
        }
    
        // probably other members that wrap actionBlock members,
        // like Complete() and Completion
    }
    
  2. 如果您想创建更具可组合性和可重用性的东西,您可以为取消创建一个特殊块。您可以使用它们BufferBlock链接在一起来实现这一点,其中第三个的容量为 1,第二个的容量为无限。这样,几乎所有排队的项目都将位于第二个块中,因此您只需将该块交换为新块即可执行取消。整个结构将由Encapsulate()第一个和第三个块表示。

    这种方法的问题是取消延迟了 1 个项目(第三个块中的那个)。另外,我没有为此找到一个好的界面。

于 2014-02-26T14:19:20.357 回答