在玩过数据流之后,我遇到了一个新问题。我想限制所有块的输入队列。我的生产块 (ActionBlock) 正在快速创建 5000 个元素并将它们发布到广播块。因此,如果我将广播块的 BoundedCapacity 设置为 100,他会丢弃大量数据。但我更希望生产块等待我的缓冲区块的输入队列中的新插槽。
有没有办法摆脱这个问题?
在玩过数据流之后,我遇到了一个新问题。我想限制所有块的输入队列。我的生产块 (ActionBlock) 正在快速创建 5000 个元素并将它们发布到广播块。因此,如果我将广播块的 BoundedCapacity 设置为 100,他会丢弃大量数据。但我更希望生产块等待我的缓冲区块的输入队列中的新插槽。
有没有办法摆脱这个问题?
这正是BufferBlock
它的用途。如果您设置它BoundedCapacity
并且它已满,它将推迟接收任何消息,直到有人使用它们。这意味着例如Post()
将阻塞并SendAsync()
返回一个 unfinished Task
。
编辑:没有内置块可以发送到多个目标并且永远不会丢弃数据。但是您可以轻松地自己构建一个ActionBlock
并发送循环:
static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
IEnumerable<ITargetBlock<T>> targets, int boundedCapacity)
{
var targetsList = targets.ToList();
var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
},
new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });
// TODO: propagate completion from block to targets
return block;
}
此代码假定您不需要为每个目标克隆数据并且目标列表永远不会更改。为此修改代码应该相当简单。