使用 TPL 数据流库,我想做这样的事情:
myActionBlock.Post(newValue, cancelAllPreviousPosts: true);
看来 ActionBlock 上的取消令牌取消了整个事情;如果我设置它,我必须制作一个新的 ActionBlock。是否可以使用 ActionBlock 进行部分取消?
不应尝试尚未处理的帖子。如果有一些取消令牌可用于签入当前正在执行的帖子,那就太好了。
使用 TPL 数据流库,我想做这样的事情:
myActionBlock.Post(newValue, cancelAllPreviousPosts: true);
看来 ActionBlock 上的取消令牌取消了整个事情;如果我设置它,我必须制作一个新的 ActionBlock。是否可以使用 ActionBlock 进行部分取消?
不应尝试尚未处理的帖子。如果有一些取消令牌可用于签入当前正在执行的帖子,那就太好了。
看一下BroadcastBlock<T>
,它仅包含发布到它的最新项目。您可以将广播块放在ActionBlock<T>
.
虽然向广播块发布新项目不会取消当前正在由操作块处理的项目,但它会覆盖广播块已经持有的任何现有项目;实际上丢弃任何尚未被操作块处理的旧消息。当动作块完成其当前项目时,它将获取发布到广播块的最新项目。
除了 Monroe Thomas 的回答之外,重要的是要了解 BroadcastBlock 之后的 ActionBlock 需要将其BoundedCapacity 限制为 1,否则它将存储和处理广播块的每条消息,即使它仍在执行。
这里有一个代码示例:
ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber =>
{
await Task.Delay(100);
Console.WriteLine($">{ThisNumber}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null);
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true });
for(int IX = 0; IX < 128; IX++)
{
await ThrottleBlock.SendAsync(IX);
await Task.Delay(10);
}
这导致以下结果:
>0
>6
>12
>20
>27
>34
>41
>48
>55
>62
>68
>75
>82
>88
>95
>101
>108
>115
>122
>127
享受!
-西蒙
在 TPL 数据流中没有直接这样的东西,但我可以看到几种方法可以自己实现它:
如果您不需要能够将修改后的块视为普通数据流块(例如不支持LinkTo()
),那么一种简单的方法是编写一个 wraps 的类型ActionBlock
,但其项目还包含一个标志,表明它们是否应该被处理。当您指定 时cancelAllPreviousPosts: true
,所有这些标志都将被重置,因此将跳过这些项目。
代码可能如下所示:
class CancellableActionBlock<T>
{
private class Item
{
public T Data { get; private set; }
public bool ShouldProcess { get; set; }
public Item(T data)
{
Data = data;
ShouldProcess = true;
}
}
private readonly ActionBlock<Item> actionBlock;
private readonly ConcurrentDictionary<Item, bool> itemSet;
public CancellableActionBlock(Action<T> action)
{
itemSet = new ConcurrentDictionary<Item, bool>();
actionBlock = new ActionBlock<Item>(item =>
{
bool ignored;
itemSet.TryRemove(item, out ignored);
if (item.ShouldProcess)
{
action(item.Data);
}
});
}
public bool Post(T data, bool cancelAllPreviousPosts = false)
{
if (cancelAllPreviousPosts)
{
foreach (var item in itemSet.Keys)
{
item.ShouldProcess = false;
}
itemSet.Clear();
}
var newItem = new Item(data);
itemSet.TryAdd(newItem, true);
return actionBlock.Post(newItem);
}
// probably other members that wrap actionBlock members,
// like Complete() and Completion
}
如果您想创建更具可组合性和可重用性的东西,您可以为取消创建一个特殊块。您可以使用它们BufferBlock
链接在一起来实现这一点,其中第三个的容量为 1,第二个的容量为无限。这样,几乎所有排队的项目都将位于第二个块中,因此您只需将该块交换为新块即可执行取消。整个结构将由Encapsulate()
第一个和第三个块表示。
这种方法的问题是取消延迟了 1 个项目(第三个块中的那个)。另外,我没有为此找到一个好的界面。