0

我有三个任务,一个是生产者,然后是消费者,最后一个是在完成前两个任务后打印一些东西。但是代码没有到达最后一个任务,这意味着没有打印。

 while (true)
            {
                ThreadEvent.WaitOne(waitingTime, false);

                lock (SyncVar)
                {
                    collection = new BlockingCollection<string>(4);
                    Task producer = Task.Run(() =>
                     {
                         if (list.Count > 0)
                             Console.WriteLine("Block begin");
                         while (!collection.IsAddingCompleted)
                         {
                             var firstItem = list.FirstOrDefault();
                             collection.TryAdd(firstItem);
                             list.Remove(firstItem);
                         }
                         collection.CompleteAdding();
                     });
                    Task consumer = Task.Run(() => DoConsume());
                    Task endTask = consumer.ContinueWith(i => Console.WriteLine("Block end"));// not print this line, why?
                    Task.WaitAll(producer, consumer, endTask);
                    if (ThreadState != State.Running) break;
                }
            }

请看我的代码逻辑。

编辑:

对于“DoConsume”,它很复杂。

public void DoConsume()
    {
        if (collection.Count > 0)
            Console.WriteLine("There are {0} channels to be processed.", collection.Count);

        var workItemBlock = new ActionBlock<string>(
        workItem =>
        {
            bool result =ProcessEachChannel(workItem);
        });

        foreach (var workItem in collection.GetConsumingEnumerable())
        {
             workItemBlock.Post(workItem);
        }

        workItemBlock.Complete();
    }
4

2 回答 2

0

问题是您的生产者永远不会完成:

// This will run until after CompleteAdding is called
while (!collection.IsAddingCompleted)
{
    var firstItem = list.FirstOrDefault();
    collection.TryAdd(firstItem);
    list.Remove(firstItem);
}
//... which doesn't happen until after the loop
collection.CompleteAdding();

看起来您只是想添加列表中的所有项目,这应该很简单:

Task producer = Task.Run(() =>
{
    if (list.Count > 0)
        Console.WriteLine("Block begin");
    while(list.Any())
    {
        var firstItem = list.First();
        collection.TryAdd(firstItem);
        list.Remove(firstItem);
    }
    collection.CompleteAdding();
});

或者,更简单的方法:

Task producer = Task.Run(() =>
{
    if (list.Count > 0)
        Console.WriteLine("Block begin");
    foreach(var item in list)
    {
        collection.TryAdd(item);
    }
    list.Clear();
    collection.CompleteAdding();
});
于 2014-08-11T20:52:10.530 回答
0

我使用了 Reed Copsey 的代码,但错误仍然存​​在。只是想不通为什么。我认为我的代码在while (!collection.IsAddingCompleted).

因为集合的边界是 4,所以假设集合中还剩下两个项目。条件collection.IsAddingCompleted永远不会满足,因此代码无法跳出 while 循环。

我重写了代码,看起来不错。代码类似于MSDN。我曾经Take检索集合中的元素。

while (true)
{
    ThreadEvent.WaitOne(waitingTime, false);

    lock (SyncVar)
    {
        collection = new BlockingCollection<string>(4);
        DoWork dc = new DoWork();
        Task consumer = Task.Run(() =>
        {
            while (!collection.IsCompleted)
            {
                string data = "";
                try
                {
                    if (collection.Count > 0)
                        data = collection.Take();
                }
                catch (InvalidOperationException e)
                {
                    Console.WriteLine(e.Message);
                }
                if (data != "")
                {
                    bool result = dc.DoConsume(data);
                }
            }
        });

        Task producer = Task.Run(() =>
         {
             if (list.Count > 0)
                 Console.WriteLine("Block begin");
             foreach (var item in list)
             {
                 collection.Add(item);
             }
             list.Clear();

             collection.CompleteAdding();
         });
        Task endTask = consumer.ContinueWith(i => Console.WriteLine("Block end"));
        Task.WaitAll(producer, consumer, endTask);
        if (ThreadState != State.Running) break;
    }
于 2014-08-12T15:59:36.160 回答