1

我正面临两难境地(!)。

在第一个场景中,我实现了一个解决方案,该解决方案使用 SQLBulkCopy 同步地将数据从一个数据库复制到另一个数据库,我完全没有问题。

现在,使用ThreadPool,我在异步场景中实现了相同的方法,每个表一个线程,一切正常,但是过了一段时间(通常是 1 小时,因为复制操作需要大约相同的时间),操作发送到ThreadPool停止执行。每个线程有一个不同的SQLBulkCopy使用一个不同的。SQLConnection

我已经看到了空闲线程的数量,并且它们在调用开始时都是空闲的。我有一个AutoResetEvent等待线程在再次启动之前完成它们的工作,以及一个保存活动线程计数器的信号量 FIFO。

在使用 SqlBulkCopy 时是否有一些我忘记或应该解决的问题?我感谢一些帮助,因为我的想法已经结束;)


->用法

SemaphoreFIFO waitingThreads = new SemaphoreFIFO();
AutoResetEvent autoResetEvent = new AutoResetEvent(false);
(...)
List<TableMappingHolder> list = BulkCopy.Mapping(tables);
waitingThreads.Put(list.Count, 300000);

for (int i = 0; i < list.Count; i++){
    ThreadPool.QueueUserWorkItem(call =>
         //Replication
         (...)
         waitingThreads.Get();

        if (waitingThreads.Counter == 0)
            autoResetEvent.Set();
    );
}

bool finalized = finalized = autoResetEvent.WaitOne(300000);
(...)

//批量复制

 public bool SetData(SqlDataReader reader, string _destinationTableName, List<SqlBulkCopyColumnMapping> _sqlBulkCopyColumnMappings)
        {
            using (SqlConnection destinationConnection =
                            new SqlConnection(ConfigurationManager.ConnectionStrings["dconn"].ToString()))
            {
                destinationConnection.Open();

                // Set up the bulk copy object.
                // Note that the column positions in the source
                // data reader match the column positions in
                // the destination table so there is no need to
                // map columns.
                using (SqlBulkCopy bulkCopy =
                           new SqlBulkCopy(destinationConnection))                    {
                    bulkCopy.BulkCopyTimeout = 300000;
                    bulkCopy.DestinationTableName = _destinationTableName;

                    // Set up the column mappings by name.
                    foreach (SqlBulkCopyColumnMapping columnMapping in _sqlBulkCopyColumnMappings)
                        bulkCopy.ColumnMappings.Add(columnMapping);

                    try{
                        // Write from the source to the destination.
                        bulkCopy.WriteToServer(reader);
                    }
                    catch (Exception ex){return false;}
                    finally
                    {
                        try{reader.Close();}
                        catch (Exception e){//log}
                        try{bulkCopy.Close();}
                        catch (Exception e){//log}
                        try{destinationConnection.Close(); }
                        catch (Exception e){ //log    }
                    }
                }
            }
            return true;
        }
#

信号

public sealed class SemaphoreFIFO
{
    private int _counter;
    private readonly LinkedList<int> waitQueue = new LinkedList<int>();

    public int Counter
    {
        get { return _counter; }
    }

    private void internalNotify()
    {
        if (waitQueue.Count > 0 && _counter == 0)
        {
            Monitor.PulseAll(waitQueue);
        }
    }

    public void Get()
    {
        lock (waitQueue)
        {
            _counter --;
            internalNotify();
        }
    }

    public bool Put(int n, int timeout)
    {
        if (timeout < 0 && timeout != Timeout.Infinite)
            throw new ArgumentOutOfRangeException("timeout");
        if (n < 0)
            throw new ArgumentOutOfRangeException("n");

        lock (waitQueue)
        {
            if (waitQueue.Count == 0 && _counter ==0)
            {
                _counter +=n;
                internalNotify();
                return true;
            }

            int endTime = Environment.TickCount + timeout;
            LinkedListNode<int> me = waitQueue.AddLast(n);
            try
            {
                while (true)
                {
                    Monitor.Wait(waitQueue, timeout);

                    if (waitQueue.First == me && _counter ==0)
                    {
                        _counter += n;
                        waitQueue.RemoveFirst();
                        internalNotify();
                        return true;
                    }

                    if (timeout != Timeout.Infinite)
                    {
                        int remainingTime = endTime - Environment.TickCount;
                        if (remainingTime <= 0)
                        {
                            // TIMEOUT
                            if (waitQueue.First == me)
                            {
                                waitQueue.RemoveFirst();
                                internalNotify();
                            }
                            else
                                waitQueue.Remove(me);
                            return false;
                        }
                        timeout = remainingTime;
                    }
                }
            }
            catch (ThreadInterruptedException e)
            {
                // INTERRUPT
                if (waitQueue.First == me)
                {
                    waitQueue.RemoveFirst();
                    internalNotify();
                }
                else
                    waitQueue.Remove(me);
                throw e;
            }
        }
    }
}
4

1 回答 1

1

我会回到同步使用 SQLBulkCopy。我不确定通过同时进行大量批量复制(而不是一个接一个)你会获得什么。它可能会更快地完成所有事情,但我什至不确定。

于 2010-04-06T17:39:58.273 回答