1

本周早些时候在 Stackoverflow 上获得了一些帮助,这导致了生产者/消费者模式的发展,用于加载处理并将大型数据集导入 RavenDb。 CPU 绑定任务的并行化继续与 IO 绑定

我现在希望限制生产者提前准备的工作单元的数量,以管理内存消耗。我已经使用基本信号量实现了节流,但我在某个点遇到了实现死锁的问题。

我无法弄清楚可能导致死锁的原因。以下是代码的摘录:

private static void LoadData<TParsedData, TData>(IDataLoader<TParsedData> dataLoader, int batchSize, Action<IndexedBatch<TData>> importProceedure, Func<IEnumerable<TParsedData>, List<TData>> processProceedure)
    where TParsedData : class
    where TData : class
{
    Console.WriteLine(@"Loading {0}...", typeof(TData).ToString());

    var batchCounter = 0;

    var ist = Stopwatch.StartNew();

    var throttler = new SemaphoreSlim(10);
    var bc = new BlockingCollection<IndexedBatch<TData>>();
    var importTask = Task.Run(() =>
    {
        bc.GetConsumingEnumerable()
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            //or
            //.WithDegreeOfParallelism(1)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(data =>
            {
                var st = Stopwatch.StartNew();
                importProceedure(data);

                Console.WriteLine(@"Batch imported {0} in {1} ms", data.Index, st.ElapsedMilliseconds);
                throttler.Release();
            });
    });
    var processTask = Task.Run(() =>
    {
        dataLoader.GetParsedItems()
            .Partition(batchSize)
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            //or
            //.WithDegreeOfParallelism(1)
            .WithMergeOptions(ParallelMergeOptions.NotBuffered)
            .ForAll(batch =>
            {
                throttler.Wait(); //.WaitAsync()
                var batchno = ++batchCounter;
                var st = Stopwatch.StartNew();

                bc.Add(new IndexedBatch<TData>(batchno, processProceedure(batch)));

                Console.WriteLine(@"Batch processed {0} in {1} ms", batchno, st.ElapsedMilliseconds);
            });
    });

    processTask.Wait();
    bc.CompleteAdding();
    importTask.Wait();

    Console.WriteLine(nl(1) + @"Loading {0} completed in {1} ms", typeof(TData).ToString(), ist.ElapsedMilliseconds);
}

public class IndexedBatch<TBatch> 
    where TBatch : class
{
    public IndexedBatch(int index, List<TBatch> batch)
    {
        Index = index;
        Batch = batch ?? new List<TBatch>();
    }

    public int Index { get; set; }
    public List<TBatch> Batch { get; set; }
}

这是对 LoadData 的调用:

LoadData<DataBase, Data>(
    DataLoaderFactory.Create<DataBase>(datafilePath),
    1024,
    (data) =>
    {
        using (var session = Store.OpenSession())
        {
            foreach (var i in data.Batch)
            {
                session.Store(i);
                d.TryAdd(i.LongId.GetHashCode(), int.Parse(i.Id.Substring(i.Id.LastIndexOf('/') + 1)));
            }
            session.SaveChanges();
        }
    },
    (batch) =>
    {
        return batch.Select(i => new Data()
        {
            ...
        }).ToList();
    }
);

Store 是一个 RavenDb IDocumentStore。DataLoaderFactory 为给定数据集构造一个自定义解析器。

4

2 回答 2

1

你能检查你有多少线程吗?由于阻塞,您可能已经耗尽了线程池。TPL 注入的线程比ProcessorCount它认为没有它们你的代码会死锁的线程多。但它只能在一定限度内这样做。

无论如何,在 TPL 任务内部阻塞通常不是一个好主意,因为内置的启发式方法最适合非阻塞的东西。

于 2012-07-29T23:10:32.967 回答
1

很难调试没有大箭头的死锁,上面写着“在这里阻塞!”。避免在没有调试器的情况下调试代码:BlockingCollection 已经可以节流。使用接受参数的构造int boundedCapacity函数并消除信号量。解决您的僵局的几率非常高。

于 2012-07-29T17:52:19.587 回答