0

我有一个大约 5MB 的大型 CSV 数据库,其中包含我正在尝试导入 SQL Server CE 数据库的邮政编码、城市和州。

使用单线程,该过程估计需要大约 3 小时才能完成。虽然这对于完成工作很好,但我想尝试将任务拆分到多个线程中,以减少 3 小时的总时间。如果我在每个线程上创建一个SqlCeConnection对象,同时在每个线程上运行命令是否安全?

我感觉会有并发和死锁的问题。这是我找到 CSV 数据库的地方:http ://www.unitedstateszipcodes.org/zip-code-database/

这是我的相关代码:

List<AddressSet> addressList;

public void OpenCSV(string file)
{
    var addresses = from line in File.ReadAllLines(file).Skip(1)
                    let columns = line.Split(',')
                    select new AddressSet
                    {
                        ZipCode = columns[0].Replace("\"", "").Trim(),
                        City = columns[2].Replace("\"", "").Trim(),
                        State = columns[5].Replace("\"", "").Trim()
                    };
    addressList = addresses.ToList();

    Thread worker = new Thread(new ThreadStart(ProcessData));
    worker.Start();

}

private void ProcessData()
{
    try
    {
        int i = 1;
        DateTime operationStart = DateTime.Now;
        foreach (AddressSet address in addressList)
        {
            int stateId = InsertState(address.State);
            int zipCodeId = InsertZipCode(address.ZipCode, stateId);
            int cityId = InsertCity(address.City, stateId);

            UpdateRelationships(zipCodeId, cityId);
            float pct = i / (float)addressList.Count() * 100;
            TimeSpan timeSinceStart = DateTime.Now.Subtract(operationStart);
            TimeSpan totalTime = TimeSpan.FromMilliseconds(timeSinceStart.TotalMilliseconds / (pct/100));
            TimeSpan timeLeft = totalTime - timeSinceStart;
            //richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.Text = pct.ToString("N2") + "% (" + i + " of " + addressList.Count().ToString() + ") " + address.City + ", " + address.State + " " + address.ZipCode 
            //    + "\nEstimated Total Time: " + totalTime.Days.ToString() + " days, " + totalTime.Hours.ToString() + " hours, " + totalTime.Minutes.ToString() + " minutes" +
            //    " - Time Left: " + timeLeft.Days.ToString() + " days, " + timeLeft.Hours.ToString() + " hours, " + timeLeft.Minutes.ToString() + " minutes"));
            richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.Text = pct.ToString("N2") + "% (" + i + " of " + addressList.Count().ToString() + ") " + address.City + ", " + address.State + " " + address.ZipCode
                + "\nEstimated Total Time: " + totalTime.ToString("h'h 'm'm 's's'") +
                "\nTime Left: " + timeLeft.ToString("h'h 'm'm 's's'") +
                "\nRunning Time: " + timeSinceStart.ToString("h'h 'm'm 's's'")));
            richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.SelectionStart = richTextBox1.Text.Length));
            richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.ScrollToCaret()));
            i++;
        }
        this.Invoke(new Action(() =>
        {
            MessageBox.Show("Done!");
            btnChooseCSV.Enabled = true;
        }));
    }
    catch (Exception ex)
    {
        this.Invoke(new Action(() =>
        {
            MessageBox.Show(ex.Message);
        }));
    }
}

private int InsertZipCode(string zipCode, int stateId)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM ZipCode WHERE ZipCode = @ZipCode", connection);

    command.Parameters.AddWithValue("ZipCode", zipCode);

    int result = (int)command.ExecuteScalar();

    // if nothing found, insert
    if (result == 0)
    {
        command = new SqlCeCommand("INSERT INTO ZipCode(ZipCode, StateId) VALUES(@ZipCode, @StateId)", connection);
        command.Parameters.AddWithValue("ZipCode", zipCode);
        command.Parameters.AddWithValue("StateId", stateId);
        command.ExecuteNonQuery();

        command = new SqlCeCommand("SELECT @@IDENTITY", connection);
    }

    if (result == 1)
    {
        command = new SqlCeCommand("SELECT ZipCodeId FROM ZipCode WHERE ZipCode = @ZipCode", connection);
        command.Parameters.AddWithValue("ZipCode", zipCode);
    }

    string test = command.ExecuteScalar().ToString();
    result = int.Parse(test);

    connection.Close();
    return result;
}

private int InsertCity(string city, int stateId)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM City WHERE CityName = @City", connection);
    command.Parameters.AddWithValue("City", city);
    int result = (int)command.ExecuteScalar();

    // if nothing found, insert
    if (result == 0)
    {
        command = new SqlCeCommand("INSERT INTO City(CityName, StateId) VALUES(@City, @StateId)", connection);
        command.Parameters.AddWithValue("City", city);
        command.Parameters.AddWithValue("StateId", stateId);
        command.ExecuteNonQuery();

        command = new SqlCeCommand("SELECT @@IDENTITY", connection);
    }

    if (result == 1)
    {
        command = new SqlCeCommand("SELECT CityId FROM City WHERE CityName = @City", connection);
        command.Parameters.AddWithValue("City", city);
    }
    string test = command.ExecuteScalar().ToString();
    result = int.Parse(test);

    connection.Close();
    return result;
}

private int InsertState(string state)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM State WHERE State = @State", connection);
    command.Parameters.AddWithValue("State", state);
    int result = (int)command.ExecuteScalar();

    // if nothing found, insert
    if (result == 0)
    {
        command = new SqlCeCommand("INSERT INTO State(State) VALUES(@State)", connection);
        command.Parameters.AddWithValue("State", state);
        command.ExecuteNonQuery();

        command = new SqlCeCommand("SELECT @@IDENTITY", connection);
    }

    if (result == 1)
    {
        command = new SqlCeCommand("SELECT StateId FROM State WHERE State = @State", connection);
        command.Parameters.AddWithValue("State", state);
    }
    string test = command.ExecuteScalar().ToString();
    result = int.Parse(test);

    connection.Close();
    return result;
}

private void UpdateRelationships(int zipCodeId, int cityId)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("INSERT INTO CityZipCode(CityId, ZipCodeId) VALUES(@CityId, @ZipCodeId)", connection);

    command.Parameters.AddWithValue("CityId", cityId);
    command.Parameters.AddWithValue("ZipCodeId", zipCodeId);
    command.ExecuteNonQuery();

    connection.Close();
}

编辑:

澄清一下,我不只是简单地从 CSV 文件中插入每一行信息。我通过将每个相应的项目插入单独的表并添加每个实体之间的关系来更改数据的布局方式。

例如,一个城市可以有多个邮政编码,而一个邮政编码有时可以覆盖多个城市,因此可以用多对多关系表示。城市和邮政编码只有一个州,因此这种关系是多对一的。

我有一张城市、邮政编码和州的表格。我还有一张将城市与邮政编码相关联的表格。我将需要修改我的关系表架构,以使同名城市可能存在于多个州。关系表实际上应该是一个包含城市、州和邮政编码的集合,而不仅仅是城市和邮政编码。

我的最终目标是将带有密码保护的 SQL Server CE 数据库与另一个用于城市、州和邮政编码验证的应用程序一起分发。我不想分发 CSV 数据库,因为任何人都可以更改它以通过验证。

4

2 回答 2

4

您必须为每个线程创建一个连接对象,这对于多线程是不安全的:

SqlCeConnection 类

已编辑

SQL CE 对象不是线程安全的,也不是线程关联的。如果 SqlCeConnection 或 SqlCeTransaction 的实例在线程间共享而没有确保线程安全,则可能导致 Access Violation 异常。

建议每个线程应该使用单独的连接而不是共享。如果确实需要跨线程共享 SQL CE 对象,那么应用程序应该序列化对这些对象的访问。

使用 SQL Server Compact 进行多线程编程

为什么不使用SQL Server Compact Toolbox你可以使用它,它会根据 CSV 文件生成 INSERT 语句。

或使用CSV 到 SQLCE 数据库应用程序的转换

于 2013-04-04T16:06:24.330 回答
1

只是一个建议,我正在做同样的事情,这就是我所拥有的,与简单的解决方案相比它非常快

public static DataTable CSVToDataTable(string path, string name)
{
    return CSVToDataTable(Path.Combine(path, name));
}

public static DataTable CSVToDataTable(string path)
{
    DataTable res = new DataTable();
    if (!File.Exists(path))
    {
        return res;
    }
    using (FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
    {
        using (StreamReader re = new StreamReader(stream))
        {
            if (re.EndOfStream)
                return res;
            string line = re.ReadLine();
            if (line.IsNullOrWhiteSpace())
                return res;
            string[] headers = LineToArray(line);
            foreach (string header in headers)
            {
                res.Columns.Add(header);
            }
            int i = 0;
            string[] cells = new string[0];
            DataRow row = null;
            while (!re.EndOfStream)
            {
                line = re.ReadLine();
                if (line.IsNullOrWhiteSpace())
                    continue;
                cells = LineToArray(line);
                row = res.NewRow();
                for (i = 0; i < headers.Length && i < cells.Length; i += 1)
                {
                    row[i] = cells[i];
                }
                res.Rows.Add(row);
            }
        }
    }
    return res;
}

private static string[] LineToArray(string line, char delimiter = ',')
{
    if (line.Contains("\""))
    {
        List<string> l = new List<string>();
        bool inq = false;
        string cell = string.Empty;
        char lastCh = 'x';
        foreach (char ch in line)
        {
            if (ch == '"')
            {
                if (cell.Length == 0)
                {
                    inq = true;
                }
                else if (lastCh == '\\')
                {
                    cell += ch;
                }
                else
                {
                    inq = false;
                }
            }
            else if (delimiter == ch)
            {
                if (inq)
                {
                    cell += ch;
                }
                else
                {
                    l.Add(cell);
                    inq = false;
                    cell = string.Empty;
                }
            }
            else
            {
                cell += ch;
            }
            if (inq)
                lastCh = ch;
            else
                lastCh = 'x';
        }
        return l.ToArray();
    }
    else
    {
        return line.Split(new String[] { delimiter.ToString() }, StringSplitOptions.None);
    }
}

public void insert(string path, string name, string table, bool KeepNulls){

    DataTable data = CSVToDataTable(path, name);
    //do data manipulation here

    SqlCeBulkCopyOptions options = new SqlCeBulkCopyOptions();
    if (KeepNulls)
    {
        options = options |= SqlCeBulkCopyOptions.KeepNulls;
    }
    using (SqlCeBulkCopy bc = new SqlCeBulkCopy(Fastway_Remote_Agent.Properties.Settings.Default.DatabaseConnectionString, options))
    {
        bc.DestinationTableName = table;
        bc.WriteToServer(data);
    }
}

使用这个库:http ://sqlcebulkcopy.codeplex.com/

也用于线程池(更改它以满足您的需要):

/// <summary>
/// Manages open connections on a per-thread basis
/// </summary>
public abstract class SqlCeConnectionPool
{
    private static Dictionary<int, DBCon> threadConnectionMap = new Dictionary<int, DBCon>();

    private static Dictionary<int, Thread> threadMap = new Dictionary<int, Thread>();

    /// <summary>
    /// The connection map
    /// </summary>
    public static Dictionary<int, DBCon> ThreadConnectionMap
    {
        get { return SqlCeConnectionPool.threadConnectionMap; }
    }

    /// <summary>
    /// Gets the connection string.
    /// </summary>
    /// <value>The connection string.</value>
    public static ConnectionString ConnectionString
    {
        get { return global::ConnectionString.Default; }
    }

    /// <summary>
    /// Gets a connection for this thread, maintains one open one of each.
    /// </summary>
    /// <remarks>Don't do this with anything but SQL compact edition or you'll run out of connections - compact edition is not
    /// connection pooling friendly and unloads itself too often otherwise so that is why this class exists</remarks> 
    /// <returns>An open connection</returns>
    public static DBCon Connection
    {
        get
        {
            lock (threadConnectionMap)
            {
                //do some quick maintenance on existing connections (closing those that have no thread)
                List<int> removeItems = new List<int>();
                foreach (var kvp in threadConnectionMap)
                {
                    if (threadMap.ContainsKey(kvp.Key))
                    {
                        if (!threadMap[kvp.Key].IsAlive)
                        {
                            //close the connection
                            if (!kvp.Value.Disposed)
                                kvp.Value.Dispose();
                            removeItems.Add(kvp.Key);
                        }
                    }
                    else
                    {
                        if (!kvp.Value.Disposed)
                            kvp.Value.Dispose();
                        removeItems.Add(kvp.Key);
                    }
                }
                foreach (int i in removeItems)
                {
                    threadMap.Remove(i);
                    threadConnectionMap.Remove(i);
                }

                //now issue the appropriate connection for our current thread
                int threadId = Thread.CurrentThread.ManagedThreadId;

                DBCon connection = null;
                if (threadConnectionMap.ContainsKey(threadId))
                {
                    connection = threadConnectionMap[threadId];
                    if (connection.Disposed)
                    {
                        if (threadConnectionMap.ContainsKey(threadId))
                            threadConnectionMap.Remove(threadId);
                        if (threadMap.ContainsKey(threadId))
                            threadMap.Remove(threadId);
                        connection = null;
                    }
                    else if (connection.Connection.State == ConnectionState.Broken)
                    {
                        connection.Dispose();
                        if (threadConnectionMap.ContainsKey(threadId))
                            threadConnectionMap.Remove(threadId);
                        if (threadMap.ContainsKey(threadId))
                            threadMap.Remove(threadId);
                        connection = null;
                    }
                    else if (connection.Connection.State == ConnectionState.Closed)
                    {
                        connection.Dispose();
                        if (threadConnectionMap.ContainsKey(threadId))
                            threadConnectionMap.Remove(threadId);
                        if (threadMap.ContainsKey(threadId))
                            threadMap.Remove(threadId);
                        connection = null;
                    }

                }
                if (connection == null)
                {
                    connection = new DBCon(ConnectionString);
                    //connection.Connection.Open();
                    if (threadConnectionMap.ContainsKey(threadId))
                        threadConnectionMap[threadId] = connection;
                    else
                        threadConnectionMap.Add(threadId, connection);
                    if (threadMap.ContainsKey(threadId))
                        threadMap[threadId] = Thread.CurrentThread;
                    else
                        threadMap.Add(threadId, Thread.CurrentThread);

                }
                return connection;
            }
        }
    }
}
于 2013-09-11T00:01:09.253 回答