我有一个模拟生成必须保存到数据库的数据。
ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
SaveDataToDatabase(cds);
});
模拟生成了一大堆数据,所以先生成然后保存到数据库(最多1 GB数据)是不切实际的,并且将它一个一个保存到数据库也没有意义(交易太小,不实用)。我想将它们作为受控大小的批量插入(比如一次提交 100)插入到数据库中。
但是,我认为我对并行计算的了解并不那么理论。我想出了这个(你可以看到这是非常有缺陷的):
DataBuffer buffer = new DataBuffer(...);
ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);
buffer.SaveDataToBuffer(cds, i == r - 1);
});
public class DataBuffer
{
int count = 0;
int limit = 100
object _locker = new object();
ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }
public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}
if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}
public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}
如您所见,我使用队列来创建缓冲区,然后手动决定何时提交。但是,我有一种强烈的感觉,这对我的问题来说并不是很有效的解决方案。首先,我不确定我是否正确锁定。其次,我不确定这是否是完全线程安全的(或根本不安全)。
您能否看一下并评论我应该做些什么不同的事情?或者,如果有更好的方法来做到这一点(使用某种生产者-消费者技术或其他东西)?
谢谢和最良好的祝愿, D.