我想知道使用链接到一个或多个 ActionBlock 的 BufferBlock 是否有好处,除了节流(使用 BoundedCapacity),而不是直接发布到 ActionBlock(只要不需要节流)。
3 回答
如果您只想将项目从一个块转发到其他几个块,则不需要BufferBlock
.
但肯定有它有用的情况。例如,如果您有一个复杂的数据流网络,您可能希望从较小的子网络构建它,每个子网络都以自己的方法创建。为此,您需要某种方式来表示一组块。在您提到的情况下,从方法中返回该单曲BufferBlock
(可能为ITargetBlock
)将是一个简单的解决方案。
另一个BufferBlock
有用的示例是,如果您想将项目从多个源块发送到多个目标块。如果您用作BufferBlock
中介,则不必将每个源块连接到每个目标块。
我敢肯定还有许多其他示例可以使用BufferBlock
. 当然,如果你看不出有任何理由在你的情况下使用它,那就不要。
为了补充 svick 的答案,缓冲块还有另一个好处。如果您有一个具有多个输出链接的块并希望在它们之间保持平衡,则必须将输出块变为有界容量 1 并添加一个缓冲块来处理排队。
这是我们计划做的:
- 一些代码块将使用它的 Post(T t) 方法将数据发布到 BufferBlock。
- 此 BufferBlock 使用 BufferBlock 的 LinkTo t) 方法链接到 3 个 ActionBlock 实例。
请注意,BufferBlock 不会将输入数据的副本移交给它链接到的所有目标块。相反,它只对一个目标块这样做。这里我们期望当一个目标忙于处理请求时。它将被移交给另一个目标。现在让我们参考下面的代码:
static void Main(string[] args)
{
BufferBlock<int> bb = new BufferBlock<int>();
ActionBlock<int> a1 = new ActionBlock<int>(a =>
{
Thread.Sleep(100);
Console.WriteLine("Action A1 executing with value {0}", a);
});
ActionBlock<int> a2 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A2 executing with value {0}", a);
});
ActionBlock<int> a3 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A3 executing with value {0}", a);
});
bb.LinkTo(a1);
bb.LinkTo(a2);
bb.LinkTo(a3);
Task t = new Task(() =>
{
int i = 0;
while (i < 10)
{
Thread.Sleep(50);
i++;
bb.Post(i);
}
}
);
t.Start();
Console.Read();
}
执行时会产生以下输出:
- 以值 1 执行的动作 A1
- 以值 2 执行的动作 A1
- 以值 3 执行的动作 A1
- 以值 4 执行的动作 A1
- 以值 5 执行的动作 A1
- 以值 6 执行的动作 A1
- 以值 7 执行的动作 A1
- 以值 8 执行的动作 A1
- 以值 9 执行的动作 A1
- 以值 10 执行的动作 A1
这表明只有一个目标实际上在执行所有数据,即使它很忙(由于有意添加的 Thread.Sleep(100))。为什么?
这是因为默认情况下,所有目标块本质上都是贪婪的,并且即使它们无法处理数据也会缓冲输入。为了改变这种行为,我们在初始化 ActionBlock 时将 DataFlowBlockOptions 中的 Bounded Capacity 设置为 1,如下所示。
static void Main(string[] args)
{
BufferBlock<int> bb = new BufferBlock<int>();
ActionBlock<int> a1 = new ActionBlock<int>(a =>
{
Thread.Sleep(100);
Console.WriteLine("Action A1 executing with value {0}", a);
}
, new ExecutionDataflowBlockOptions {BoundedCapacity = 1});
ActionBlock<int> a2 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A2 executing with value {0}", a);
}
, new ExecutionDataflowBlockOptions {BoundedCapacity = 1});
ActionBlock<int> a3 = new ActionBlock<int>(a =>
{
Thread.Sleep(50);
Console.WriteLine("Action A3 executing with value {0}", a);
}
, new ExecutionDataflowBlockOptions {BoundedCapacity = 1});
bb.LinkTo(a1);
bb.LinkTo(a2);
bb.LinkTo(a3);
Task t = new Task(() =>
{
int i = 0;
while (i < 10)
{
Thread.Sleep(50);
i++;
bb.Post(i);
}
});
t.Start();
Console.Read();
}
这个程序的输出是:
- 以值 1 执行的动作 A1
- 以值 3 执行的动作 A2
- 以值 2 执行的动作 A1
- 以值 6 执行的动作 A3
- 以值 7 执行的动作 A3
- 以值 8 执行的动作 A3
- 以值 5 执行的动作 A2
- 以值 9 执行的动作 A3
- 以值 4 执行的动作 A1
- 以值 10 执行的动作 A2
这显然是数据在三个 ActionBlock(s) 中的预期分布。
不,第二个示例由于多种原因无法编译: 只能为“分组”数据流块设置 greedy=false - 而不是为执行块设置;然后必须通过 GroupingDataflowBlockOptions 设置 - 而不是 DataflowBlockOptions;然后将其设置为属性值“{ Greedy = false }”而不是构造函数参数。
如果要限制操作块的容量,请通过设置 DataflowBlockOptions 的 BoundedCapacity 属性的值来实现(尽管正如 OP 所述,他们已经知道此选项)。像这样:
var a1 = new ActionBlock<int>(
i => doSomeWork(i),
new ExecutionDataflowBlockOptions {BoundedCapacity = 1}
);