32

我想使用这样的并行循环处理一些东西:

public void FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
    });

}

好的,它工作正常。但是如果我希望 FillLogs 方法返回一个 IEnumerable 怎么办?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
        yield return cpt // KO, don't work
    });

}

编辑

这似乎是不可能的......但我使用这样的东西:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    return computers.AsParallel().Select(cpt => cpt);
}

但是我把cpt.Logs = cpt.GetRawLogs().ToList();指令放在哪里

4

5 回答 5

16

Short version - no, that isn't possible via an iterator block; the longer version probably involves synchronized queue/dequeue between the caller's iterator thread (doing the dequeue) and the parallel workers (doing the enqueue); but as a side note - logs are usually IO-bound, and parallelising things that are IO-bound often doesn't work very well.

If the caller is going to take some time to consume each, then there may be some merit to an approach that only processes one log at a time, but can do that while the caller is consuming the previous log; i.e. it begins a Task for the next item before the yield, and waits for completion after the yield... but that is again, pretty complex. As a simplified example:

static void Main()
{
    foreach(string s in Get())
    {
        Console.WriteLine(s);
    }
}

static IEnumerable<string> Get() {
    var source = new[] {1, 2, 3, 4, 5};
    Task<string> outstandingItem = null;
    Func<object, string> transform = x => ProcessItem((int) x);
    foreach(var item in source)
    {
        var tmp = outstandingItem;

        // note: passed in as "state", not captured, so not a foreach/capture bug
        outstandingItem = new Task<string>(transform, item);
        outstandingItem.Start();

        if (tmp != null) yield return tmp.Result;
    }
    if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
    return i.ToString();
}
于 2011-12-07T09:29:17.970 回答
5

I don't want to be offensive, but maybe there is a lack of understanding. Parallel.ForEach means that the TPL will run the foreach according to the available hardware in several threads. But that means, that ii is possible to do that work in parallel! yield return gives you the opportunity to get some values out of a list (or what-so-ever) and give them back one-by-one as they are needed. It prevents of the need to first find all items matching the condition and then iterate over them. That is indeed a performance advantage, but can't be done in parallel.

于 2011-12-07T09:28:39.303 回答
0

How about

            Queue<string> qu = new Queue<string>();
            bool finished = false;
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(get_list(), (item) =>
                {
                    string itemToReturn = heavyWorkOnItem(item);         
                    lock (qu)
                       qu.Enqueue(itemToReturn );                        
                });
                finished = true;
            });

            while (!finished)
            {
                lock (qu)
                    while (qu.Count > 0)
                        yield return qu.Dequeue();
                //maybe a thread sleep here?
            }

Edit: I think this is better:

        public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
        {
            ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
            bool finished = false;
            AutoResetEvent re = new AutoResetEvent(false);
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(source, (item) =>
                {
                    qu.Enqueue(func(item));
                    re.Set();
                });
                finished = true;
                re.Set();
            });

            while (!finished)
            {
                re.WaitOne();
                while (qu.Count > 0)
                {
                    TOutput res;
                    if (qu.TryDequeue(out res))
                        yield return res;
                }
            }
        }   

Edit2: I agree with the short No answer. This code is useless; you cannot break the yield loop.

于 2017-08-10T05:24:13.430 回答
0

Although the question is old I've managed to do something just for fun.

class Program
{
    static void Main(string[] args)
    {
        foreach (var message in GetMessages())
        {
            Console.WriteLine(message);
        }
    }


    // Parallel yield
    private static IEnumerable<string> GetMessages()
    {
        int total = 0;
        bool completed = false;
        var batches = Enumerable.Range(1, 100).Select(i => new Computer() { Id = i });
        var qu = new ConcurrentQueue<Computer>();
        Task.Run(() =>
        {
            try
            {
                Parallel.ForEach(batches,
                    () => 0,
                    (item, loop, subtotal) =>
                    {
                        Thread.Sleep(1000);
                        qu.Enqueue(item);
                        return subtotal + 1;
                    },
                    result => Interlocked.Add(ref total, result));
            }
            finally
            {
                completed = true;
            }
        });

        int current = 0;
        while (current < total || !completed)
        {
            SpinWait.SpinUntil(() => current < total || completed);
            if (current == total) yield break;
            current++;
            qu.TryDequeue(out Computer computer);
            yield return $"Completed {computer.Id}";
        }
    }
}

public class Computer
{
    public int Id { get; set; }
}

Compared to Koray's answer this one really uses all the CPU cores.

于 2020-01-28T09:51:03.350 回答
0

You can use the following extension method

public static class ParallelExtensions
{
    public static IEnumerable<T1> OrderedParallel<T, T1>(this IEnumerable<T> list, Func<T, T1> action)
    {
        var unorderedResult = new ConcurrentBag<(long, T1)>();
        Parallel.ForEach(list, (o, state, i) =>
        {
            unorderedResult.Add((i, action.Invoke(o)));
        });
        var ordered = unorderedResult.OrderBy(o => o.Item1);
        return ordered.Select(o => o.Item2);
    }
}

use like:

public void FillLogs(IEnumerable<IComputer> computers)
{
    cpt.Logs = computers.OrderedParallel(o => o.GetRawLogs()).ToList();
}

Hope this will save you some time.

于 2020-03-14T12:05:41.980 回答