1

我将生产者/消费者模式与 FileHelpers 库一起使用,使用多个线程从一个文件(可能很大)导入数据。每个线程都应该导入该文件的一部分,我想使用 FileHelperAsyncEngine 实例的 LineNumber 属性,该实例正在读取文件作为导入行的主键。FileHelperAsyncEngine 内部有一个 IEnumerator IEnumerable.GetEnumerator(); 使用 engine.ReadNext() 方法进行迭代。这在内部设置 LineNumber 属性(这似乎不是线程安全的)。

消费者将拥有与之关联的生产者,生产者将向消费者提供数据表,消费者将通过 SqlBulkLoad 类使用它们,该类将使用 IDataReader 实现,该实现将遍历消费者实例内部的数据表集合。的每个实例都将有一个与之关联的 SqlBulkCopy 实例。

我有线程锁定问题。下面是我如何创建多个 Producer 线程。我在后面开始每个线程。将调用生产者实例上的 Produce 方法来确定将处理哪个输入文件块。似乎 engine.LineNumber 不是线程安全的,我没有在数据库中导入正确的 LineNumber。似乎到那时 engine.LineNumber 被读取了其他一些名为 engine.ReadNext() 的线程并更改了 engine.LineNumber 属性。我不想锁定应该处理大量输入文件的循环,因为我失去了并行性。如何重组代码来解决这个线程问题?

谢谢拉德

            for (int i = 0; i < numberOfProducerThreads; i++)
            DataConsumer consumer = dataConsumers[i];

            //create a new producer
            DataProducer producer = new DataProducer();

            //consumer has already being created
            consumer.Subscribe(producer);

            FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
            orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
            orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;

            int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;

            Thread newThread = new Thread(() =>
            {
                producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
                consumer.SetEndOfData(producer);
            }); 
            producerThreads.Add(newThread); thread.Start();}

    public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
    {
        lock (this)
        {
            engine.Options.IgnoreFirstLines = skipLines;
            engine.BeginReadFile(inputFilePath);
        }

        int rowCount = 1;

        DataTable buffer = consumer.BufferDataTable;
        while (engine.ReadNext() != null)
        {
            lock (this)
            {
                dict[lineNumberFieldName] = engine.LineNumber;
                buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
                if (rowCount % DataBuffer.MaxBufferRowCount == 0)
                {
                    consumer.AddBufferDataTable(buffer);
                    buffer = consumer.BufferDataTable;
                }
                if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
                {
                    break;
                }
                rowCount++;
            }
        }
        if (buffer.Rows.Count > 0)
        {
            consumer.AddBufferDataTable(buffer);
        }
        engine.Close();
    }
4

3 回答 3

2

Dictionary<> 不是线程安全的。上面代码中的字典是否被正确锁定或仅用于您的锁(this)?

顺便说一句,我会避免使用 lock(this) 范例并使用通用对象来锁定您的代码。您可能会遇到与特定资源无关的其他锁定问题。我在我的博客(C# .Net 中用于线程安全代码的智能资源锁定)中详细介绍了该问题。高温高压

于 2010-04-12T19:52:22.633 回答
1

你是对的 LineNumber 不是线程安全的:(

我只是调查了代码,发现我们从内部阅读器中读取了该行,然后更新了 LineNumber,因此根本不是线程安全的。

问题是,如果我们在里面添加一些 sincronization 代码,我们可以让事情变得更慢,也许我们需要创建内部代码的线程安全版本来避免这种开销。

无论如何,我认为从性能的角度来看,代码中较慢的部分是文件操作,因此使用多个线程进行读取不会加快速度。也许您只需要一个线程将文件读取到工作队列,并且有多个线程读取它并处理每条记录,在这种情况下,您可以获得所需的线程安全

干杯

于 2010-05-03T12:16:22.570 回答
0

我想我纠正了这个问题。需要锁的是 Dictionary<>

lock (dict) { dict[lineNumberFieldName] = engine.LineNumber; buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer)); 感谢 OmegaMan 提供了一个很好的线索。

于 2010-04-13T18:34:15.587 回答