3

我最近开始使用 .NET 4.5 的 TPL Dataflow 库,块的整个概念对我来说是新的。我在我的应用程序中实现了一个生产者-消费者队列,我需要防止重复的消息被放入队列,因此需要检查消息是否已经排队。我正在使用一种BufferBlock<Message>类型(Message是一种自定义类型)。BufferBlock具有 Count 属性,但这对这个问题没有帮助,因为需要唯一标识消息。

有没有办法检查 a 是否BufferBlock包含一个项目或检查所有项目并检查它们?是否可以转换BufferBlock为允许对项目进行迭代的东西?我正在关注我在 MSDN 上看到的一个示例,它不检查项目是否在队列中,但我认为检查队列的内容是一项相当需要的操作。任何帮助表示赞赏。

4

1 回答 1

5

与其闯入BufferBlock,不如将 aTransformManyBlock插入为您执行此操作的链中?您可以使用 a HashSet,该Add方法仅true在尚未添加项目时返回。它最终变得非常简单,但存储需求显然会随着时间的推移而增加......

void Main()
{
    var bb = new BufferBlock<string>();
    var db = DataflowEx.CreateDistinctBlock<string>();
    var ab = new ActionBlock<string>(x => Console.WriteLine(x));
    bb.LinkTo(db);
    db.LinkTo(ab);
    bb.Post("this");
    bb.Post("this");
    bb.Post("this");
    bb.Post("is");
    bb.Post("is");
    bb.Post("a");
    bb.Post("test");
}

public class DataflowEx
{
    public static TransformManyBlock<T, T> CreateDistinctBlock<T>()
    {
        var hs = new HashSet<T>();
        //hs will be captured in the closure of the delegate
        //supplied to the TransformManyBlock below and therefore
        //will have the same lifespan as the returned block.
        //Look up the term "c# closure" for more info
        return new TransformManyBlock<T, T>(
                         x => Enumerable.Repeat(x, hs.Add(x) ? 1 : 0));
    }
}

这样做的原因是,就像 Linq 的 SelectMany 一样,TransformManyBlock 有效地展平了列表列表。因此,TransformManyBlock 接受一个返回 的委托IEnumerable<T>,但一次提供返回的项目中的项目IEnumerable<T>。通过返回IEnumerable<T>其中包含 0 或 1 个项目的一个,我们可以有效地创建Where类似行为,根据是否满足某个谓词,允许一个项目通过或阻止它通过。在这种情况下,谓词是我们是否可以将项目添加到捕获的 HashSet。

于 2013-12-28T00:35:59.317 回答