6

我使用 aBlockingCollection在 C# 4.0 中实现生产者-消费者模式。

BlockingCollection存放占用大量内存的项目。我想让生产者一次从 BlockingCollection 中取出一个项目,然后处理它。

我在想,通过使用 foreach on BlockingCollection.GetConsumingEnumerable(),每次BlockingCollection都会从底层队列中删除项目(这意味着所有与引用一起),因此在处理项目的 Process() 方法结束时,项目可能是垃圾集。

但是这是错误的。似乎 foreach 循环BlockingCollection.GetConsumingEnumerable()确实保存了进入队列的项目的所有引用。在退出 foreach 循环之前,所有项目都被保留(从而防止被垃圾收集)。

我没有使用简单的 foreach 循环 on BlockingCollection.GetConsumingEnumerable(),而是使用 while 循环测试 BlockingCollection.IsComplete 标志,并在我BlockingCollection.Take()用来抓取消耗品的循环内。我会假设它BlockingCollection.Take()具有与 类似的效果List.Remove(),它将从 BlockingCollection 中删除项目的引用。但这又是错误的。所有项目仅在 while 循环之外进行垃圾收集。

所以我的问题是,我们如何才能轻松实现 BlockingCollection 可能包含消耗内存的项目并且每个项目一旦被消费者消费就可以被垃圾回收的要求?非常感谢您的帮助。

编辑:根据要求,添加了一个简单的演示代码:

// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
    private static int counter_;
    private int id_;
    public int ID { get{ return id_; } }
    public Entity() { id_ = counter++; }
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}

...

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();

// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
    tasks_.Add(Task.Factory.StartNew(ProduceEntity);
    Console.WriteLine("Start processing.");
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
    Task.WaitAll(tasks_.ToArray());
}

// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
    {
        var newEntity = new Entity();
        Console.WriteLine("Create entity {0}.", newEntity.ID);
        jobQueue_.Add(newEntity);
    }
    jobQueue_.CompleteAdding();
}

// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
    while(!jobQueue_.IsCompleted){
        Entity entity;
        if(jobQueue_.TryTake(entity))
        {
            Console.WriteLine("Process entity {0}.", entity.ID);
            entity = null;

            // I would assume after GC, the entity will be finalized and garbage collected, but NOT.
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
        }
    }
    Console.WriteLine("Finish processing.");
}

输出是所有创建和处理消息,后面是“完成处理”。然后是来自实体的所有销毁消息。创建实体消息显示 Entity.ID 从 0 到 9,销毁消息显示 Entity.ID 从 9 到 0。

编辑:

即使我设置了 BlockingCollection 的绑定容量,所有进入它的项目也只有在循环退出时才会最终确定,这很奇怪。

4

3 回答 3

6

ConcurrentQueue 包含具有 32 个项目的内部数组的段。在段被垃圾收集之前,实体项目不会被垃圾收集。这将在所有 32 个项目从队列中取出后发生。如果您更改示例以添加 32 个项目,您将在“完成处理”之前看到“销毁实体”消息。

于 2010-11-08T12:23:01.697 回答
2

BlockingCollection 是否继续持有引用取决于它使用的集合类型。

默认集合BlockingCollection<T>类型是ConcurrentQueue<T>.

所以垃圾收集行为将取决于收集类型。在ConcurrentQueue<T> 的情况下,这是一个 FIFO 结构,所以如果在将引用从队列中删除后没有从数据结构中释放引用,我会感到非常惊讶(这是一种队列的定义)!

您如何确定对象没有被垃圾收集?

于 2010-10-21T05:54:28.440 回答