与其闯入BufferBlock
,不如将 aTransformManyBlock
插入为您执行此操作的链中?您可以使用 a HashSet
,该Add
方法仅true
在尚未添加项目时返回。它最终变得非常简单,但存储需求显然会随着时间的推移而增加......
void Main()
{
var bb = new BufferBlock<string>();
var db = DataflowEx.CreateDistinctBlock<string>();
var ab = new ActionBlock<string>(x => Console.WriteLine(x));
bb.LinkTo(db);
db.LinkTo(ab);
bb.Post("this");
bb.Post("this");
bb.Post("this");
bb.Post("is");
bb.Post("is");
bb.Post("a");
bb.Post("test");
}
public class DataflowEx
{
public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
{
var hs = new HashSet<T>();
//hs will be captured in the closure of the delegate
//supplied to the TransformManyBlock below and therefore
//will have the same lifespan as the returned block.
//Look up the term "c# closure" for more info
return new TransformManyBlock<T, T>(
x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
}
}
这样做的原因是,就像 Linq 的 SelectMany 一样,TransformManyBlock 有效地展平了列表列表。因此,TransformManyBlock 接受一个返回 的委托IEnumerable<T>
,但一次提供返回的项目中的项目IEnumerable<T>
。通过返回IEnumerable<T>
其中包含 0 或 1 个项目的一个,我们可以有效地创建Where
类似行为,根据是否满足某个谓词,允许一个项目通过或阻止它通过。在这种情况下,谓词是我们是否可以将项目添加到捕获的 HashSet。