5

我正在使用第三方库来迭代一些非常大的平面文件,这可能需要几分钟。该库提供了一个枚举器,因此您可以在枚举器提取平面文件中的下一项时产生每个结果并对其进行处理。

例如:

IEnumerable<object> GetItems()
{
    var cursor = new Cursor;

    try
    {
        cursor.Open();

        while (!cursor.EOF)
        {
            yield return new //object;

            cursor.MoveNext();
        }

    }
    finally
    {
        if (cursor.IsOpen)
        {
            cursor.Close();
        }
    }
}

我想要实现的是拥有相同 Enumerable 的两个消费者,因此我不必两次提取信息,因此每个消费者仍然可以在每个项目到达时对其进行处理,而不必一直等待到达一次。

IEnumerable<object> items = GetItems();

new Thread(SaveToDateBase(items)).Start();
new Thread(SaveSomewhereElse(items)).Start();

我想我想要达到的目标是

“如果消费者要求的项目已经被提取,那么就放弃它,否则移动下一步并等待”,但我意识到两个线程之间可能存在 MoveNext() 冲突。

如果没有任何关于如何实现的想法,这样的事情是否已经存在?

谢谢

4

2 回答 2

5

BlockingCollection<T>您正在寻找使用 .NET 4和 TPL 任务的管道模式实现。请参阅此 StackOverflow 帖子中的完整示例的答案。

示例:3 个同时使用的消费者

BlockingCollection<string> queue = new BlockingCollection<string>();    
public void Start()
{
    var producerWorker = Task.Factory.StartNew(() => ProducerImpl());
    var consumer1 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer2 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer3 = Task.Factory.StartNew(() => ConsumerImpl());

    Task.WaitAll(producerWorker, consumer1, consumer2, consumer3);
}

private void ProducerImpl()
{
   // 1. Read a raw data from a file
   // 2. Preprocess it
   // 3. Add item to a queue
   queue.Add(item);
}

// ConsumerImpl must be thrad safe 
// to allow launching multiple consumers simulteniously
private void ConsumerImpl()
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        // TODO
    }
}

如果还有什么不清楚的地方,请告诉我。

管道流程的高级图:

在此处输入图像描述

于 2012-10-26T14:35:29.787 回答
3

本质上,您想要的是缓存IEnumerable<T>的数据,但在存储之前无需等待它完成。你可以这样做:

public static IEnumerable<T> Cache<T>(this IEnumerable<T> source)
{
    return new CacheEnumerator<T>(source);
}

private class CacheEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public CacheEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>();
        cacheEntry.Sequence = sequence.GetEnumerator();
        cacheEntry.CachedValues = new List<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    for (int i = 0; entry.ensureItemAt(i); i++)
    {
        yield return entry.CachedValues[i];
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public IEnumerator<T> Sequence { get; set; }

    //storing it as object, but the underlying objects will be lists of various generic types.
    public List<T> CachedValues { get; set; }

    private static object key = new object();
    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there's no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (Sequence.MoveNext())
            {
                CachedValues.Add(Sequence.Current);
                return true;
            }
            else
            {
                Sequence.Dispose();
                FullyPopulated = true;
                return false;
            }
        }
    }
}

示例用法:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .Cache();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}
于 2012-10-26T14:41:35.380 回答