我将生产者/消费者模式与 FileHelpers 库一起使用,使用多个线程从一个文件(可能很大)导入数据。每个线程都应该导入该文件的一部分,我想使用 FileHelperAsyncEngine 实例的 LineNumber 属性,该实例正在读取文件作为导入行的主键。FileHelperAsyncEngine 内部有一个 IEnumerator IEnumerable.GetEnumerator(); 使用 engine.ReadNext() 方法进行迭代。这在内部设置 LineNumber 属性(这似乎不是线程安全的)。
消费者将拥有与之关联的生产者,生产者将向消费者提供数据表,消费者将通过 SqlBulkLoad 类使用它们,该类将使用 IDataReader 实现,该实现将遍历消费者实例内部的数据表集合。的每个实例都将有一个与之关联的 SqlBulkCopy 实例。
我有线程锁定问题。下面是我如何创建多个 Producer 线程。我在后面开始每个线程。将调用生产者实例上的 Produce 方法来确定将处理哪个输入文件块。似乎 engine.LineNumber 不是线程安全的,我没有在数据库中导入正确的 LineNumber。似乎到那时 engine.LineNumber 被读取了其他一些名为 engine.ReadNext() 的线程并更改了 engine.LineNumber 属性。我不想锁定应该处理大量输入文件的循环,因为我失去了并行性。如何重组代码来解决这个线程问题?
谢谢拉德
for (int i = 0; i < numberOfProducerThreads; i++)
DataConsumer consumer = dataConsumers[i];
//create a new producer
DataProducer producer = new DataProducer();
//consumer has already being created
consumer.Subscribe(producer);
FileHelperAsyncEngine orderDetailEngine = new FileHelperAsyncEngine(recordType);
orderDetailEngine.Options.RecordCondition.Condition = RecordCondition.ExcludeIfBegins;
orderDetailEngine.Options.RecordCondition.Selector = STR_ORDR;
int skipLines = i * numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount;
Thread newThread = new Thread(() =>
{
producer.Produce(consumer, inputFilePath, lineNumberFieldName, dict, orderDetailEngine, skipLines, numberOfBufferTablesToProcess);
consumer.SetEndOfData(producer);
});
producerThreads.Add(newThread); thread.Start();}
public void Produce(DataConsumer consumer, string inputFilePath, string lineNumberFieldName, Dictionary<string, object> dict, FileHelperAsyncEngine engine, int skipLines, int numberOfBufferTablesToProcess)
{
lock (this)
{
engine.Options.IgnoreFirstLines = skipLines;
engine.BeginReadFile(inputFilePath);
}
int rowCount = 1;
DataTable buffer = consumer.BufferDataTable;
while (engine.ReadNext() != null)
{
lock (this)
{
dict[lineNumberFieldName] = engine.LineNumber;
buffer.Rows.Add(ObjectFieldsDataRowMapper.MapObjectFieldsToDataRow(engine.LastRecord, dict, buffer));
if (rowCount % DataBuffer.MaxBufferRowCount == 0)
{
consumer.AddBufferDataTable(buffer);
buffer = consumer.BufferDataTable;
}
if (rowCount % (numberOfBufferTablesToProcess * DataBuffer.MaxBufferRowCount) == 0)
{
break;
}
rowCount++;
}
}
if (buffer.Rows.Count > 0)
{
consumer.AddBufferDataTable(buffer);
}
engine.Close();
}