我正面临两难境地(!)。
在第一个场景中,我实现了一个解决方案,该解决方案使用 SQLBulkCopy 同步地将数据从一个数据库复制到另一个数据库,我完全没有问题。
现在,使用ThreadPool
,我在异步场景中实现了相同的方法,每个表一个线程,一切正常,但是过了一段时间(通常是 1 小时,因为复制操作需要大约相同的时间),操作发送到ThreadPool
停止执行。每个线程有一个不同的SQLBulkCopy
使用一个不同的。SQLConnection
我已经看到了空闲线程的数量,并且它们在调用开始时都是空闲的。我有一个AutoResetEvent
等待线程在再次启动之前完成它们的工作,以及一个保存活动线程计数器的信号量 FIFO。
在使用 SqlBulkCopy 时是否有一些我忘记或应该解决的问题?我感谢一些帮助,因为我的想法已经结束;)
->用法
SemaphoreFIFO waitingThreads = new SemaphoreFIFO();
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
(...)
List<TableMappingHolder> list = BulkCopy.Mapping(tables);
waitingThreads.Put(list.Count, 300000);
for (int i = 0; i < list.Count; i++){
ThreadPool.QueueUserWorkItem(call =>
//Replication
(...)
waitingThreads.Get();
if (waitingThreads.Counter == 0)
autoResetEvent.Set();
);
}
bool finalized = finalized = autoResetEvent.WaitOne(300000);
(...)
//批量复制
public bool SetData(SqlDataReader reader, string _destinationTableName, List<SqlBulkCopyColumnMapping> _sqlBulkCopyColumnMappings)
{
using (SqlConnection destinationConnection =
new SqlConnection(ConfigurationManager.ConnectionStrings["dconn"].ToString()))
{
destinationConnection.Open();
// Set up the bulk copy object.
// Note that the column positions in the source
// data reader match the column positions in
// the destination table so there is no need to
// map columns.
using (SqlBulkCopy bulkCopy =
new SqlBulkCopy(destinationConnection)) {
bulkCopy.BulkCopyTimeout = 300000;
bulkCopy.DestinationTableName = _destinationTableName;
// Set up the column mappings by name.
foreach (SqlBulkCopyColumnMapping columnMapping in _sqlBulkCopyColumnMappings)
bulkCopy.ColumnMappings.Add(columnMapping);
try{
// Write from the source to the destination.
bulkCopy.WriteToServer(reader);
}
catch (Exception ex){return false;}
finally
{
try{reader.Close();}
catch (Exception e){//log}
try{bulkCopy.Close();}
catch (Exception e){//log}
try{destinationConnection.Close(); }
catch (Exception e){ //log }
}
}
}
return true;
}
#
信号
public sealed class SemaphoreFIFO
{
private int _counter;
private readonly LinkedList<int> waitQueue = new LinkedList<int>();
public int Counter
{
get { return _counter; }
}
private void internalNotify()
{
if (waitQueue.Count > 0 && _counter == 0)
{
Monitor.PulseAll(waitQueue);
}
}
public void Get()
{
lock (waitQueue)
{
_counter --;
internalNotify();
}
}
public bool Put(int n, int timeout)
{
if (timeout < 0 && timeout != Timeout.Infinite)
throw new ArgumentOutOfRangeException("timeout");
if (n < 0)
throw new ArgumentOutOfRangeException("n");
lock (waitQueue)
{
if (waitQueue.Count == 0 && _counter ==0)
{
_counter +=n;
internalNotify();
return true;
}
int endTime = Environment.TickCount + timeout;
LinkedListNode<int> me = waitQueue.AddLast(n);
try
{
while (true)
{
Monitor.Wait(waitQueue, timeout);
if (waitQueue.First == me && _counter ==0)
{
_counter += n;
waitQueue.RemoveFirst();
internalNotify();
return true;
}
if (timeout != Timeout.Infinite)
{
int remainingTime = endTime - Environment.TickCount;
if (remainingTime <= 0)
{
// TIMEOUT
if (waitQueue.First == me)
{
waitQueue.RemoveFirst();
internalNotify();
}
else
waitQueue.Remove(me);
return false;
}
timeout = remainingTime;
}
}
}
catch (ThreadInterruptedException e)
{
// INTERRUPT
if (waitQueue.First == me)
{
waitQueue.RemoveFirst();
internalNotify();
}
else
waitQueue.Remove(me);
throw e;
}
}
}
}